lib/metric-config-parser/metric_config_parser/sql.py (112 lines of code) (raw):

import os from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from .metric import MetricDefinition if TYPE_CHECKING: from .config import ConfigCollection FILE_PATH = Path(os.path.dirname(__file__)) METRICS_QUERY = FILE_PATH / "templates" / "metrics_query.sql" DATA_SOURCE_QUERY = FILE_PATH / "templates" / "data_source_query.sql" DATA_SOURCE_MACROS = FILE_PATH / "templates" / "data_source_macros.j2" def generate_metrics_sql( config_collection: "ConfigCollection", metrics: List[str], platform: str, group_by: Union[List[str], Dict[str, str]] = [], where: Optional[str] = None, group_by_client_id: bool = True, group_by_submission_date: bool = True, ) -> str: """Generates a SQL query for metrics and specified parameters.""" metric_definitions: List[MetricDefinition] = [] for slug in metrics: definition = config_collection.get_metric_definition(slug, platform) if definition is None: raise ValueError(f"No definition for metric {slug} on platform {platform} found.") metric_definitions.append(definition) metrics_per_data_source: Dict[str, Any] = {} for metric in metric_definitions: if metric.select_expression is None: raise ValueError(f"No definition for metric {metric.name}") metric.select_expression = ( config_collection.get_env().from_string(metric.select_expression).render() ) if metric.data_source is None: raise ValueError(f"No data source for metric {metric.name}") if metric.data_source.name in metrics_per_data_source: metrics_per_data_source[metric.data_source.name]["metrics"].append(metric) else: data_source = config_collection.get_data_source_definition( metric.data_source.name, platform ) if data_source is None: raise ValueError(f"No valid data source definition found for metric {metric.name}") # default parameters need to be set explicitly otherwise they'll be None data_source.client_id_column = data_source.client_id_column or "client_id" data_source.submission_date_column = ( data_source.submission_date_column or "submission_date" ) metrics_per_data_source[metric.data_source.name] = { "data_source": data_source, "metrics": [metric], } # group by should be a dictionary with the key being the alias and # the value the potentially nested field; # it can also be specified as list if all fields are top-level fields that don't need an alias if isinstance(group_by, list): group_by = {g: g for g in group_by} macros_template = DATA_SOURCE_MACROS.read_text() template = METRICS_QUERY.read_text() # using `from_string()` in Jinja doens't support include statements, so # substituting them here manually template = template.replace("{% include 'data_source_macros.j2' %}", macros_template) return ( config_collection.get_env() .from_string(template) .render( **{ "metrics_per_data_source": metrics_per_data_source, "where": where, "group_by": group_by, "group_by_client_id": group_by_client_id, "group_by_submission_date": group_by_submission_date, "data_sources": { slug: data_source for definition in config_collection.definitions for slug, data_source in definition.spec.data_sources.definitions.items() if platform == definition.platform }, "select_fields": True, "ignore_joins": False, } ) ) def generate_data_source_sql( config_collection: "ConfigCollection", data_source: str, platform: str, where: Optional[str] = None, select_fields: bool = True, ignore_joins: bool = False, ) -> str: """Generates a SQL query for the specified data source.""" template = DATA_SOURCE_QUERY.read_text() macros_template = DATA_SOURCE_MACROS.read_text() # using `from_string()` in Jinja doens't support include statements, so # substituting them here manually template = template.replace("{% include 'data_source_macros.j2' %}", macros_template) data_source_definition = config_collection.get_data_source_definition(data_source, platform) if data_source_definition is None: raise ValueError(f"No valid data source definition found for {data_source}") # default parameters need to be set explicitly otherwise they'll be None data_source_definition.client_id_column = data_source_definition.client_id_column or "client_id" data_source_definition.submission_date_column = ( data_source_definition.submission_date_column or "submission_date" ) return ( config_collection.get_env() .from_string(template) .render( **{ "data_source": data_source_definition, "data_sources": { slug: data_source for definition in config_collection.definitions for slug, data_source in definition.spec.data_sources.definitions.items() if platform == definition.platform }, "where": where, "select_fields": select_fields, "ignore_joins": ignore_joins, } ) )