python/pyhive/presto.py (199 lines of code) (raw):

"""DB-API implementation backed by Presto See http://www.python.org/dev/peps/pep-0249/ Many docstrings in this file are based on the PEP, which is in the public domain. """ from __future__ import absolute_import from __future__ import unicode_literals from builtins import object from decimal import Decimal from pyhive import common from pyhive.common import DBAPITypeObject # Make all exceptions visible in this module per DB-API from pyhive.exc import * # noqa import base64 import getpass import datetime import logging import requests from requests.auth import HTTPBasicAuth import os try: # Python 3 import urllib.parse as urlparse except ImportError: # Python 2 import urlparse # PEP 249 module globals apilevel = '2.0' threadsafety = 2 # Threads may share the module and connections. paramstyle = 'pyformat' # Python extended format codes, e.g. ...WHERE name=%(name)s _logger = logging.getLogger(__name__) TYPES_CONVERTER = { "decimal": Decimal, # As of Presto 0.69, binary data is returned as the varbinary type in base64 format "varbinary": base64.b64decode } class PrestoParamEscaper(common.ParamEscaper): def escape_datetime(self, item, format): _type = "timestamp" if isinstance(item, datetime.datetime) else "date" formatted = super(PrestoParamEscaper, self).escape_datetime(item, format, 3) return "{} {}".format(_type, formatted) _escaper = PrestoParamEscaper() def connect(*args, **kwargs): """Constructor for creating a connection to the database. See class :py:class:`Connection` for arguments. :returns: a :py:class:`Connection` object. """ return Connection(*args, **kwargs) class Connection(object): """Presto does not have a notion of a persistent connection. Thus, these objects are small stateless factories for cursors, which do all the real work. """ def __init__(self, *args, **kwargs): self._args = args self._kwargs = kwargs def close(self): """Presto does not have anything to close""" # TODO cancel outstanding queries? pass def commit(self): """Presto does not support transactions""" pass def cursor(self): """Return a new :py:class:`Cursor` object using the connection.""" return Cursor(*self._args, **self._kwargs) def rollback(self): raise NotSupportedError("Presto does not have transactions") # pragma: no cover class Cursor(common.DBAPICursor): """These objects represent a database cursor, which is used to manage the context of a fetch operation. Cursors are not isolated, i.e., any changes done to the database by a cursor are immediately visible by other cursors or connections. """ def __init__(self, host, port='8080', username=None, principal_username=None, catalog='hive', schema='default', poll_interval=1, source='pyhive', session_props=None, protocol='http', password=None, requests_session=None, requests_kwargs=None, KerberosRemoteServiceName=None, KerberosPrincipal=None, KerberosConfigPath=None, KerberosKeytabPath=None, KerberosCredentialCachePath=None, KerberosUseCanonicalHostname=None): """ :param host: hostname to connect to, e.g. ``presto.example.com`` :param port: int -- port, defaults to 8080 :param username: string -- defaults to system user name :param principal_username: string -- defaults to ``username`` argument if it exists, else defaults to system user name :param catalog: string -- defaults to ``hive`` :param schema: string -- defaults to ``default`` :param poll_interval: float -- how often to ask the Presto REST interface for a progress update, defaults to a second :param source: string -- arbitrary identifier (shows up in the Presto monitoring page) :param protocol: string -- network protocol, valid options are ``http`` and ``https``. defaults to ``http`` :param password: string -- Deprecated. Defaults to ``None``. Using BasicAuth, requires ``https``. Prefer ``requests_kwargs={'auth': HTTPBasicAuth(username, password)}``. May not be specified with ``requests_kwargs['auth']``. :param requests_session: a ``requests.Session`` object for advanced usage. If absent, this class will use the default requests behavior of making a new session per HTTP request. Caller is responsible for closing session. :param requests_kwargs: Additional ``**kwargs`` to pass to requests :param KerberosRemoteServiceName: string -- Presto coordinator Kerberos service name. This parameter is required for Kerberos authentiation. :param KerberosPrincipal: string -- The principal to use when authenticating to the Presto coordinator. :param KerberosConfigPath: string -- Kerberos configuration file. (default: /etc/krb5.conf) :param KerberosKeytabPath: string -- Kerberos keytab file. :param KerberosCredentialCachePath: string -- Kerberos credential cache. :param KerberosUseCanonicalHostname: boolean -- Use the canonical hostname of the Presto coordinator for the Kerberos service principal by first resolving the hostname to an IP address and then doing a reverse DNS lookup for that IP address. This is enabled by default. """ super(Cursor, self).__init__(poll_interval) # Config self._host = host self._port = port """ Presto User Impersonation: https://docs.starburstdata.com/latest/security/impersonation.html User impersonation allows the execution of queries in Presto based on principal_username argument, instead of executing the query as the account which authenticated against Presto. (Usually a service account) Allows for a service account to authenticate with Presto, and then leverage the principal_username as the user Presto will execute the query as. This is required by applications that leverage authentication methods like SAML, where the application has a username, but not a password to still leverage user specific Presto Resource Groups and Authorization rules that would not be applied when only using a shared service account. This also allows auditing of who is executing a query in these environments, instead of having all queryes run by the shared service account. """ self._username = principal_username or username or getpass.getuser() self._catalog = catalog self._schema = schema self._arraysize = 1 self._poll_interval = poll_interval self._source = source self._session_props = session_props if session_props is not None else {} self.last_query_id = None if protocol not in ('http', 'https'): raise ValueError("Protocol must be http/https, was {!r}".format(protocol)) self._protocol = protocol self._requests_session = requests_session or requests requests_kwargs = dict(requests_kwargs) if requests_kwargs is not None else {} if KerberosRemoteServiceName is not None: from requests_kerberos import HTTPKerberosAuth, OPTIONAL hostname_override = None if KerberosUseCanonicalHostname is not None \ and KerberosUseCanonicalHostname.lower() == 'false': hostname_override = host if KerberosConfigPath is not None: os.environ['KRB5_CONFIG'] = KerberosConfigPath if KerberosKeytabPath is not None: os.environ['KRB5_CLIENT_KTNAME'] = KerberosKeytabPath if KerberosCredentialCachePath is not None: os.environ['KRB5CCNAME'] = KerberosCredentialCachePath requests_kwargs['auth'] = HTTPKerberosAuth(mutual_authentication=OPTIONAL, principal=KerberosPrincipal, service=KerberosRemoteServiceName, hostname_override=hostname_override) else: if password is not None and 'auth' in requests_kwargs: raise ValueError("Cannot use both password and requests_kwargs authentication") for k in ('method', 'url', 'data', 'headers'): if k in requests_kwargs: raise ValueError("Cannot override requests argument {}".format(k)) if password is not None: requests_kwargs['auth'] = HTTPBasicAuth(username, password) if protocol != 'https': raise ValueError("Protocol must be https when passing a password") self._requests_kwargs = requests_kwargs self._reset_state() def _reset_state(self): """Reset state about the previous query in preparation for running another query""" super(Cursor, self)._reset_state() self._nextUri = None self._columns = None @property def description(self): """This read-only attribute is a sequence of 7-item sequences. Each of these sequences contains information describing one result column: - name - type_code - display_size (None in current implementation) - internal_size (None in current implementation) - precision (None in current implementation) - scale (None in current implementation) - null_ok (always True in current implementation) The ``type_code`` can be interpreted by comparing it to the Type Objects specified in the section below. """ # Sleep until we're done or we got the columns self._fetch_while( lambda: self._columns is None and self._state not in (self._STATE_NONE, self._STATE_FINISHED) ) if self._columns is None: return None return [ # name, type_code, display_size, internal_size, precision, scale, null_ok (col['name'], col['type'], None, None, None, None, True) for col in self._columns ] def execute(self, operation, parameters=None): """Prepare and execute a database operation (query or command). Return values are not defined. """ headers = { 'X-Presto-Catalog': self._catalog, 'X-Presto-Schema': self._schema, 'X-Presto-Source': self._source, 'X-Presto-User': self._username, } if self._session_props: headers['X-Presto-Session'] = ','.join( '{}={}'.format(propname, propval) for propname, propval in self._session_props.items() ) # Prepare statement if parameters is None: sql = operation else: sql = operation % _escaper.escape_args(parameters) self._reset_state() self._state = self._STATE_RUNNING url = urlparse.urlunparse(( self._protocol, '{}:{}'.format(self._host, self._port), '/v1/statement', None, None, None)) _logger.info('%s', sql) _logger.debug("Headers: %s", headers) response = self._requests_session.post( url, data=sql.encode('utf-8'), headers=headers, **self._requests_kwargs) self._process_response(response) def cancel(self): if self._state == self._STATE_NONE: raise ProgrammingError("No query yet") if self._nextUri is None: assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None" return response = self._requests_session.delete(self._nextUri, **self._requests_kwargs) if response.status_code != requests.codes.no_content: fmt = "Unexpected status code after cancel {}\n{}" raise OperationalError(fmt.format(response.status_code, response.content)) self._state = self._STATE_FINISHED self._nextUri = None def poll(self): """Poll for and return the raw status data provided by the Presto REST API. :returns: dict -- JSON status information or ``None`` if the query is done :raises: ``ProgrammingError`` when no query has been started .. note:: This is not a part of DB-API. """ if self._state == self._STATE_NONE: raise ProgrammingError("No query yet") if self._nextUri is None: assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None" return None response = self._requests_session.get(self._nextUri, **self._requests_kwargs) self._process_response(response) return response.json() def _fetch_more(self): """Fetch the next URI and update state""" self._process_response(self._requests_session.get(self._nextUri, **self._requests_kwargs)) def _process_data(self, rows): for i, col in enumerate(self.description): col_type = col[1].split("(")[0].lower() if col_type in TYPES_CONVERTER: for row in rows: if row[i] is not None: row[i] = TYPES_CONVERTER[col_type](row[i]) def _process_response(self, response): """Given the JSON response from Presto's REST API, update the internal state with the next URI and any data from the response """ # TODO handle HTTP 503 if response.status_code != requests.codes.ok: fmt = "Unexpected status code {}\n{}" raise OperationalError(fmt.format(response.status_code, response.content)) response_json = response.json() _logger.debug("Got response %s", response_json) assert self._state == self._STATE_RUNNING, "Should be running if processing response" self._nextUri = response_json.get('nextUri') self._columns = response_json.get('columns') if 'id' in response_json: self.last_query_id = response_json['id'] if 'X-Presto-Clear-Session' in response.headers: propname = response.headers['X-Presto-Clear-Session'] self._session_props.pop(propname, None) if 'X-Presto-Set-Session' in response.headers: propname, propval = response.headers['X-Presto-Set-Session'].split('=', 1) self._session_props[propname] = propval if 'data' in response_json: assert self._columns new_data = response_json['data'] self._process_data(new_data) self._data += map(tuple, new_data) if 'nextUri' not in response_json: self._state = self._STATE_FINISHED if 'error' in response_json: raise DatabaseError(response_json['error']) # # Type Objects and Constructors # # See types in presto-main/src/main/java/com/facebook/presto/tuple/TupleInfo.java FIXED_INT_64 = DBAPITypeObject(['bigint']) VARIABLE_BINARY = DBAPITypeObject(['varchar']) DOUBLE = DBAPITypeObject(['double']) BOOLEAN = DBAPITypeObject(['boolean'])