redash/query_runner/impala_ds.py (120 lines of code) (raw):
import logging
from redash.query_runner import *
from redash.utils import json_dumps
logger = logging.getLogger(__name__)
try:
from impala.dbapi import connect
from impala.error import DatabaseError, RPCError
enabled = True
except ImportError as e:
enabled = False
COLUMN_NAME = 0
COLUMN_TYPE = 1
types_map = {
"BIGINT": TYPE_INTEGER,
"TINYINT": TYPE_INTEGER,
"SMALLINT": TYPE_INTEGER,
"INT": TYPE_INTEGER,
"DOUBLE": TYPE_FLOAT,
"DECIMAL": TYPE_FLOAT,
"FLOAT": TYPE_FLOAT,
"REAL": TYPE_FLOAT,
"BOOLEAN": TYPE_BOOLEAN,
"TIMESTAMP": TYPE_DATETIME,
"CHAR": TYPE_STRING,
"STRING": TYPE_STRING,
"VARCHAR": TYPE_STRING,
}
class Impala(BaseSQLQueryRunner):
noop_query = "show schemas"
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"host": {"type": "string"},
"port": {"type": "number"},
"protocol": {
"type": "string",
"extendedEnum": [
{"value": "beeswax", "name": "Beeswax"},
{"value": "hiveserver2", "name": "Hive Server 2"},
],
"title": "Protocol",
},
"database": {"type": "string"},
"use_ldap": {"type": "boolean"},
"ldap_user": {"type": "string"},
"ldap_password": {"type": "string"},
"timeout": {"type": "number"},
"toggle_table_string": {
"type": "string",
"title": "Toggle Table String",
"default": "_v",
"info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight.",
},
},
"required": ["host"],
"secret": ["ldap_password"],
}
@classmethod
def type(cls):
return "impala"
def _get_tables(self, schema_dict):
schemas_query = "show schemas;"
tables_query = "show tables in %s;"
columns_query = "show column stats %s.%s;"
for schema_name in [
str(a["name"]) for a in self._run_query_internal(schemas_query)
]:
for table_name in [
str(a["name"])
for a in self._run_query_internal(tables_query % schema_name)
]:
columns = [
str(a["Column"])
for a in self._run_query_internal(
columns_query % (schema_name, table_name)
)
]
if schema_name != "default":
table_name = "{}.{}".format(schema_name, table_name)
schema_dict[table_name] = {"name": table_name, "columns": columns}
return list(schema_dict.values())
def run_query(self, query, user):
connection = None
try:
connection = connect(**self.configuration.to_dict())
cursor = connection.cursor()
cursor.execute(query)
column_names = []
columns = []
for column in cursor.description:
column_name = column[COLUMN_NAME]
column_names.append(column_name)
columns.append(
{
"name": column_name,
"friendly_name": column_name,
"type": types_map.get(column[COLUMN_TYPE], None),
}
)
rows = [dict(zip(column_names, row)) for row in cursor]
data = {"columns": columns, "rows": rows}
json_data = json_dumps(data)
error = None
cursor.close()
except DatabaseError as e:
json_data = None
error = str(e)
except RPCError as e:
json_data = None
error = "Metastore Error [%s]" % str(e)
except (KeyboardInterrupt, JobTimeoutException):
connection.cancel()
raise
finally:
if connection:
connection.close()
return json_data, error
register(Impala)