# # Copyright (C) 2009-2020 the sqlparse authors and contributors # # # This module is part of python-sqlparse and is released under # the BSD License: https://opensource.org/licenses/BSD-3-Clause """SQL Lexer""" import re # This code is based on the SqlLexer in pygments. # http://pygments.org/ # It's separated from the rest of pygments to increase performance # and to allow some customizations. from io import TextIOBase from sqlparse import tokens, keywords from sqlparse.utils import consume class Lexer: """The Lexer supports configurable syntax. To add support for additional keywords, use the `add_keywords` method.""" _default_intance = None # Development notes: # - This class is prepared to be able to support additional SQL dialects # in the future by adding additional functions that take the place of # the function default_initialization() # - The lexer class uses an explicit singleton behavior with the # instance-getter method get_default_instance(). This mechanism has # the advantage that the call signature of the entry-points to the # sqlparse library are not affected. Also, usage of sqlparse in third # party code does not need to be adapted. On the other hand, singleton # behavior is not thread safe, and the current implementation does not # easily allow for multiple SQL dialects to be parsed in the same # process. Such behavior can be supported in the future by passing a # suitably initialized lexer object as an additional parameter to the # entry-point functions (such as `parse`). Code will need to be written # to pass down and utilize such an object. The current implementation # is prepared to support this thread safe approach without the # default_instance part needing to change interface. @classmethod def get_default_instance(cls): """Returns the lexer instance used internally by the sqlparse core functions.""" if cls._default_intance is None: cls._default_intance = cls() cls._default_intance.default_initialization() return cls._default_intance def default_initialization(self): """Initialize the lexer with default dictionaries. Useful if you need to revert custom syntax settings.""" self.clear() self.set_SQL_REGEX(keywords.SQL_REGEX) self.add_keywords(keywords.KEYWORDS_COMMON) self.add_keywords(keywords.KEYWORDS_ORACLE) self.add_keywords(keywords.KEYWORDS_PLPGSQL) self.add_keywords(keywords.KEYWORDS_HQL) self.add_keywords(keywords.KEYWORDS_MSACCESS) self.add_keywords(keywords.KEYWORDS) def clear(self): """Clear all syntax configurations. Useful if you want to load a reduced set of syntax configurations. After this call, regexps and keyword dictionaries need to be loaded to make the lexer functional again.""" self._SQL_REGEX = [] self._keywords = [] def set_SQL_REGEX(self, SQL_REGEX): """Set the list of regex that will parse the SQL.""" FLAGS = re.IGNORECASE | re.UNICODE self._SQL_REGEX = [ (re.compile(rx, FLAGS).match, tt) for rx, tt in SQL_REGEX ] def add_keywords(self, keywords): """Add keyword dictionaries. Keywords are looked up in the same order that dictionaries were added.""" self._keywords.append(keywords) def is_keyword(self, value): """Checks for a keyword. If the given value is in one of the KEYWORDS_* dictionary it's considered a keyword. Otherwise, tokens.Name is returned. """ val = value.upper() for kwdict in self._keywords: if val in kwdict: return kwdict[val], value else: return tokens.Name, value def get_tokens(self, text, encoding=None): """ Return an iterable of (tokentype, value) pairs generated from `text`. If `unfiltered` is set to `True`, the filtering mechanism is bypassed even if filters are defined. Also preprocess the text, i.e. expand tabs and strip it if wanted and applies registered filters. Split ``text`` into (tokentype, text) pairs. ``stack`` is the initial stack (default: ``['root']``) """ if isinstance(text, TextIOBase): text = text.read() if isinstance(text, str): pass elif isinstance(text, bytes): if encoding: text = text.decode(encoding) else: try: text = text.decode('utf-8') except UnicodeDecodeError: text = text.decode('unicode-escape') else: raise TypeError("Expected text or file-like object, got {!r}". format(type(text))) iterable = enumerate(text) for pos, char in iterable: for rexmatch, action in self._SQL_REGEX: m = rexmatch(text, pos) if not m: continue elif isinstance(action, tokens._TokenType): yield action, m.group() elif action is keywords.PROCESS_AS_KEYWORD: yield self.is_keyword(m.group()) consume(iterable, m.end() - pos - 1) break else: yield tokens.Error, char def tokenize(sql, encoding=None): """Tokenize sql. Tokenize *sql* using the :class:`Lexer` and return a 2-tuple stream of ``(token type, value)`` items. """ return Lexer.get_default_instance().get_tokens(sql, encoding)