redash/query_runner/dynamodb_sql.py (117 lines of code) (raw):

import logging import sys from redash.query_runner import * from redash.utils import json_dumps logger = logging.getLogger(__name__) try: from dql import Engine, FragmentEngine from dynamo3 import DynamoDBError from pyparsing import ParseException enabled = True except ImportError as e: enabled = False types_map = { "UNICODE": TYPE_INTEGER, "TINYINT": TYPE_INTEGER, "SMALLINT": TYPE_INTEGER, "INT": TYPE_INTEGER, "DOUBLE": TYPE_FLOAT, "DECIMAL": TYPE_FLOAT, "FLOAT": TYPE_FLOAT, "REAL": TYPE_FLOAT, "BOOLEAN": TYPE_BOOLEAN, "TIMESTAMP": TYPE_DATETIME, "DATE": TYPE_DATETIME, "CHAR": TYPE_STRING, "STRING": TYPE_STRING, "VARCHAR": TYPE_STRING, } class DynamoDBSQL(BaseSQLQueryRunner): should_annotate_query = False @classmethod def configuration_schema(cls): return { "type": "object", "properties": { "region": {"type": "string", "default": "us-east-1"}, "access_key": {"type": "string"}, "secret_key": {"type": "string"}, "toggle_table_string": { "type": "string", "title": "Toggle Table String", "default": "_v", "info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight.", }, }, "required": ["access_key", "secret_key"], "secret": ["secret_key"], } def test_connection(self): engine = self._connect() list(engine.connection.list_tables()) @classmethod def type(cls): return "dynamodb_sql" @classmethod def name(cls): return "DynamoDB (with DQL)" def _connect(self): engine = FragmentEngine() config = self.configuration.to_dict() if not config.get("region"): config["region"] = "us-east-1" if config.get("host") == "": config["host"] = None engine.connect(**config) return engine def _get_tables(self, schema): engine = self._connect() # We can't use describe_all because sometimes a user might give List permission # for * (all tables), but describe permission only for some of them. tables = engine.connection.list_tables() for table_name in tables: try: table = engine.describe(table_name, True) schema[table.name] = { "name": table.name, "columns": list(table.attrs.keys()), } except DynamoDBError: pass def run_query(self, query, user): engine = None try: engine = self._connect() if not query.endswith(";"): query = query + ";" result = engine.execute(query) columns = [] rows = [] # When running a count query it returns the value as a string, in which case # we transform it into a dictionary to be the same as regular queries. if isinstance(result, str): # when count < scanned_count, dql returns a string with number of rows scanned value = result.split(" (")[0] if value: value = int(value) result = [{"value": value}] for item in result: if not columns: for k, v in item.items(): columns.append( { "name": k, "friendly_name": k, "type": types_map.get(str(type(v)).upper(), None), } ) rows.append(item) data = {"columns": columns, "rows": rows} json_data = json_dumps(data) error = None except ParseException as e: error = "Error parsing query at line {} (column {}):\n{}".format( e.lineno, e.column, e.line ) json_data = None except (KeyboardInterrupt, JobTimeoutException): if engine and engine.connection: engine.connection.cancel() raise return json_data, error register(DynamoDBSQL)