python/pyhive/presto.py (199 lines of code) (raw):
"""DB-API implementation backed by Presto
See http://www.python.org/dev/peps/pep-0249/
Many docstrings in this file are based on the PEP, which is in the public domain.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
from builtins import object
from decimal import Decimal
from pyhive import common
from pyhive.common import DBAPITypeObject
# Make all exceptions visible in this module per DB-API
from pyhive.exc import * # noqa
import base64
import getpass
import datetime
import logging
import requests
from requests.auth import HTTPBasicAuth
import os
try: # Python 3
import urllib.parse as urlparse
except ImportError: # Python 2
import urlparse
# PEP 249 module globals
apilevel = '2.0'
threadsafety = 2 # Threads may share the module and connections.
paramstyle = 'pyformat' # Python extended format codes, e.g. ...WHERE name=%(name)s
_logger = logging.getLogger(__name__)
TYPES_CONVERTER = {
"decimal": Decimal,
# As of Presto 0.69, binary data is returned as the varbinary type in base64 format
"varbinary": base64.b64decode
}
class PrestoParamEscaper(common.ParamEscaper):
def escape_datetime(self, item, format):
_type = "timestamp" if isinstance(item, datetime.datetime) else "date"
formatted = super(PrestoParamEscaper, self).escape_datetime(item, format, 3)
return "{} {}".format(_type, formatted)
_escaper = PrestoParamEscaper()
def connect(*args, **kwargs):
"""Constructor for creating a connection to the database. See class :py:class:`Connection` for
arguments.
:returns: a :py:class:`Connection` object.
"""
return Connection(*args, **kwargs)
class Connection(object):
"""Presto does not have a notion of a persistent connection.
Thus, these objects are small stateless factories for cursors, which do all the real work.
"""
def __init__(self, *args, **kwargs):
self._args = args
self._kwargs = kwargs
def close(self):
"""Presto does not have anything to close"""
# TODO cancel outstanding queries?
pass
def commit(self):
"""Presto does not support transactions"""
pass
def cursor(self):
"""Return a new :py:class:`Cursor` object using the connection."""
return Cursor(*self._args, **self._kwargs)
def rollback(self):
raise NotSupportedError("Presto does not have transactions") # pragma: no cover
class Cursor(common.DBAPICursor):
"""These objects represent a database cursor, which is used to manage the context of a fetch
operation.
Cursors are not isolated, i.e., any changes done to the database by a cursor are immediately
visible by other cursors or connections.
"""
def __init__(self, host, port='8080', username=None, principal_username=None, catalog='hive',
schema='default', poll_interval=1, source='pyhive', session_props=None,
protocol='http', password=None, requests_session=None, requests_kwargs=None,
KerberosRemoteServiceName=None, KerberosPrincipal=None,
KerberosConfigPath=None, KerberosKeytabPath=None,
KerberosCredentialCachePath=None, KerberosUseCanonicalHostname=None):
"""
:param host: hostname to connect to, e.g. ``presto.example.com``
:param port: int -- port, defaults to 8080
:param username: string -- defaults to system user name
:param principal_username: string -- defaults to ``username`` argument if it exists,
else defaults to system user name
:param catalog: string -- defaults to ``hive``
:param schema: string -- defaults to ``default``
:param poll_interval: float -- how often to ask the Presto REST interface for a progress
update, defaults to a second
:param source: string -- arbitrary identifier (shows up in the Presto monitoring page)
:param protocol: string -- network protocol, valid options are ``http`` and ``https``.
defaults to ``http``
:param password: string -- Deprecated. Defaults to ``None``.
Using BasicAuth, requires ``https``.
Prefer ``requests_kwargs={'auth': HTTPBasicAuth(username, password)}``.
May not be specified with ``requests_kwargs['auth']``.
:param requests_session: a ``requests.Session`` object for advanced usage. If absent, this
class will use the default requests behavior of making a new session per HTTP request.
Caller is responsible for closing session.
:param requests_kwargs: Additional ``**kwargs`` to pass to requests
:param KerberosRemoteServiceName: string -- Presto coordinator Kerberos service name.
This parameter is required for Kerberos authentiation.
:param KerberosPrincipal: string -- The principal to use when authenticating to
the Presto coordinator.
:param KerberosConfigPath: string -- Kerberos configuration file.
(default: /etc/krb5.conf)
:param KerberosKeytabPath: string -- Kerberos keytab file.
:param KerberosCredentialCachePath: string -- Kerberos credential cache.
:param KerberosUseCanonicalHostname: boolean -- Use the canonical hostname of the
Presto coordinator for the Kerberos service principal by first resolving the
hostname to an IP address and then doing a reverse DNS lookup for that IP address.
This is enabled by default.
"""
super(Cursor, self).__init__(poll_interval)
# Config
self._host = host
self._port = port
"""
Presto User Impersonation: https://docs.starburstdata.com/latest/security/impersonation.html
User impersonation allows the execution of queries in Presto based on principal_username
argument, instead of executing the query as the account which authenticated against Presto.
(Usually a service account)
Allows for a service account to authenticate with Presto, and then leverage the
principal_username as the user Presto will execute the query as. This is required by
applications that leverage authentication methods like SAML, where the application has a
username, but not a password to still leverage user specific Presto Resource Groups and
Authorization rules that would not be applied when only using a shared service account.
This also allows auditing of who is executing a query in these environments, instead of
having all queryes run by the shared service account.
"""
self._username = principal_username or username or getpass.getuser()
self._catalog = catalog
self._schema = schema
self._arraysize = 1
self._poll_interval = poll_interval
self._source = source
self._session_props = session_props if session_props is not None else {}
self.last_query_id = None
if protocol not in ('http', 'https'):
raise ValueError("Protocol must be http/https, was {!r}".format(protocol))
self._protocol = protocol
self._requests_session = requests_session or requests
requests_kwargs = dict(requests_kwargs) if requests_kwargs is not None else {}
if KerberosRemoteServiceName is not None:
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
hostname_override = None
if KerberosUseCanonicalHostname is not None \
and KerberosUseCanonicalHostname.lower() == 'false':
hostname_override = host
if KerberosConfigPath is not None:
os.environ['KRB5_CONFIG'] = KerberosConfigPath
if KerberosKeytabPath is not None:
os.environ['KRB5_CLIENT_KTNAME'] = KerberosKeytabPath
if KerberosCredentialCachePath is not None:
os.environ['KRB5CCNAME'] = KerberosCredentialCachePath
requests_kwargs['auth'] = HTTPKerberosAuth(mutual_authentication=OPTIONAL,
principal=KerberosPrincipal,
service=KerberosRemoteServiceName,
hostname_override=hostname_override)
else:
if password is not None and 'auth' in requests_kwargs:
raise ValueError("Cannot use both password and requests_kwargs authentication")
for k in ('method', 'url', 'data', 'headers'):
if k in requests_kwargs:
raise ValueError("Cannot override requests argument {}".format(k))
if password is not None:
requests_kwargs['auth'] = HTTPBasicAuth(username, password)
if protocol != 'https':
raise ValueError("Protocol must be https when passing a password")
self._requests_kwargs = requests_kwargs
self._reset_state()
def _reset_state(self):
"""Reset state about the previous query in preparation for running another query"""
super(Cursor, self)._reset_state()
self._nextUri = None
self._columns = None
@property
def description(self):
"""This read-only attribute is a sequence of 7-item sequences.
Each of these sequences contains information describing one result column:
- name
- type_code
- display_size (None in current implementation)
- internal_size (None in current implementation)
- precision (None in current implementation)
- scale (None in current implementation)
- null_ok (always True in current implementation)
The ``type_code`` can be interpreted by comparing it to the Type Objects specified in the
section below.
"""
# Sleep until we're done or we got the columns
self._fetch_while(
lambda: self._columns is None and
self._state not in (self._STATE_NONE, self._STATE_FINISHED)
)
if self._columns is None:
return None
return [
# name, type_code, display_size, internal_size, precision, scale, null_ok
(col['name'], col['type'], None, None, None, None, True)
for col in self._columns
]
def execute(self, operation, parameters=None):
"""Prepare and execute a database operation (query or command).
Return values are not defined.
"""
headers = {
'X-Presto-Catalog': self._catalog,
'X-Presto-Schema': self._schema,
'X-Presto-Source': self._source,
'X-Presto-User': self._username,
}
if self._session_props:
headers['X-Presto-Session'] = ','.join(
'{}={}'.format(propname, propval)
for propname, propval in self._session_props.items()
)
# Prepare statement
if parameters is None:
sql = operation
else:
sql = operation % _escaper.escape_args(parameters)
self._reset_state()
self._state = self._STATE_RUNNING
url = urlparse.urlunparse((
self._protocol,
'{}:{}'.format(self._host, self._port), '/v1/statement', None, None, None))
_logger.info('%s', sql)
_logger.debug("Headers: %s", headers)
response = self._requests_session.post(
url, data=sql.encode('utf-8'), headers=headers, **self._requests_kwargs)
self._process_response(response)
def cancel(self):
if self._state == self._STATE_NONE:
raise ProgrammingError("No query yet")
if self._nextUri is None:
assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None"
return
response = self._requests_session.delete(self._nextUri, **self._requests_kwargs)
if response.status_code != requests.codes.no_content:
fmt = "Unexpected status code after cancel {}\n{}"
raise OperationalError(fmt.format(response.status_code, response.content))
self._state = self._STATE_FINISHED
self._nextUri = None
def poll(self):
"""Poll for and return the raw status data provided by the Presto REST API.
:returns: dict -- JSON status information or ``None`` if the query is done
:raises: ``ProgrammingError`` when no query has been started
.. note::
This is not a part of DB-API.
"""
if self._state == self._STATE_NONE:
raise ProgrammingError("No query yet")
if self._nextUri is None:
assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None"
return None
response = self._requests_session.get(self._nextUri, **self._requests_kwargs)
self._process_response(response)
return response.json()
def _fetch_more(self):
"""Fetch the next URI and update state"""
self._process_response(self._requests_session.get(self._nextUri, **self._requests_kwargs))
def _process_data(self, rows):
for i, col in enumerate(self.description):
col_type = col[1].split("(")[0].lower()
if col_type in TYPES_CONVERTER:
for row in rows:
if row[i] is not None:
row[i] = TYPES_CONVERTER[col_type](row[i])
def _process_response(self, response):
"""Given the JSON response from Presto's REST API, update the internal state with the next
URI and any data from the response
"""
# TODO handle HTTP 503
if response.status_code != requests.codes.ok:
fmt = "Unexpected status code {}\n{}"
raise OperationalError(fmt.format(response.status_code, response.content))
response_json = response.json()
_logger.debug("Got response %s", response_json)
assert self._state == self._STATE_RUNNING, "Should be running if processing response"
self._nextUri = response_json.get('nextUri')
self._columns = response_json.get('columns')
if 'id' in response_json:
self.last_query_id = response_json['id']
if 'X-Presto-Clear-Session' in response.headers:
propname = response.headers['X-Presto-Clear-Session']
self._session_props.pop(propname, None)
if 'X-Presto-Set-Session' in response.headers:
propname, propval = response.headers['X-Presto-Set-Session'].split('=', 1)
self._session_props[propname] = propval
if 'data' in response_json:
assert self._columns
new_data = response_json['data']
self._process_data(new_data)
self._data += map(tuple, new_data)
if 'nextUri' not in response_json:
self._state = self._STATE_FINISHED
if 'error' in response_json:
raise DatabaseError(response_json['error'])
#
# Type Objects and Constructors
#
# See types in presto-main/src/main/java/com/facebook/presto/tuple/TupleInfo.java
FIXED_INT_64 = DBAPITypeObject(['bigint'])
VARIABLE_BINARY = DBAPITypeObject(['varchar'])
DOUBLE = DBAPITypeObject(['double'])
BOOLEAN = DBAPITypeObject(['boolean'])