redash/query_runner/treasuredata.py (112 lines of code) (raw):

import logging from redash.query_runner import * from redash.utils import json_dumps logger = logging.getLogger(__name__) try: import tdclient from tdclient import errors enabled = True except ImportError: enabled = False TD_TYPES_MAPPING = { "bigint": TYPE_INTEGER, "tinyint": TYPE_INTEGER, "smallint": TYPE_INTEGER, "int": TYPE_INTEGER, "integer": TYPE_INTEGER, "long": TYPE_INTEGER, "double": TYPE_FLOAT, "decimal": TYPE_FLOAT, "float": TYPE_FLOAT, "real": TYPE_FLOAT, "boolean": TYPE_BOOLEAN, "timestamp": TYPE_DATETIME, "date": TYPE_DATETIME, "char": TYPE_STRING, "string": TYPE_STRING, "varchar": TYPE_STRING, } class TreasureData(BaseQueryRunner): should_annotate_query = False noop_query = "SELECT 1" @classmethod def configuration_schema(cls): return { "type": "object", "properties": { "endpoint": {"type": "string"}, "apikey": {"type": "string"}, "type": {"type": "string"}, "db": {"type": "string", "title": "Database Name"}, "get_schema": { "type": "boolean", "title": "Auto Schema Retrieval", "default": False, }, "toggle_table_string": { "type": "string", "title": "Toggle Table String", "default": "_v", "info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight.", }, }, "required": ["apikey", "db"], } @classmethod def enabled(cls): return enabled @classmethod def type(cls): return "treasuredata" def get_schema(self, get_stats=False): schema = {} if self.configuration.get("get_schema", False): try: with tdclient.Client(self.configuration.get("apikey"),endpoint=self.configuration.get("endpoint")) as client: for table in client.tables(self.configuration.get("db")): table_name = "{}.{}".format( self.configuration.get("db"), table.name ) for table_schema in table.schema: schema[table_name] = { "name": table_name, "columns": [column[0] for column in table.schema], } except Exception as ex: raise Exception("Failed getting schema") return list(schema.values()) def run_query(self, query, user): connection = tdclient.connect( endpoint=self.configuration.get("endpoint", "https://api.treasuredata.com"), apikey=self.configuration.get("apikey"), type=self.configuration.get("type", "hive").lower(), db=self.configuration.get("db"), ) cursor = connection.cursor() try: cursor.execute(query) columns_tuples = [ (i[0], TD_TYPES_MAPPING.get(i[1], None)) for i in cursor.show_job()["hive_result_schema"] ] columns = self.fetch_columns(columns_tuples) if cursor.rowcount == 0: rows = [] else: rows = [ dict(zip(([column["name"] for column in columns]), r)) for r in cursor.fetchall() ] data = {"columns": columns, "rows": rows} json_data = json_dumps(data) error = None except errors.InternalError as e: json_data = None error = "%s: %s" % ( str(e), cursor.show_job() .get("debug", {}) .get("stderr", "No stderr message in the response"), ) return json_data, error register(TreasureData)