lib/metric-config-parser/metric_config_parser/data_source.py (189 lines of code) (raw):

import fnmatch import re from enum import Enum from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union import attr from metric_config_parser.errors import DefinitionNotFound if TYPE_CHECKING: from metric_config_parser.config import ConfigCollection from .experiment import ExperimentConfiguration from .definition import DefinitionSpecSub from .project import ProjectConfiguration from . import AnalysisUnit from .util import converter, is_valid_slug class DataSourceJoinRelationship(Enum): ONE_TO_ONE = "one_to_one" MANY_TO_ONE = "many_to_one" ONE_TO_MANY = "one_to_many" MANY_TO_MANY = "many_to_many" @staticmethod def from_str(label): match label: case "one_to_one": return DataSourceJoinRelationship.ONE_TO_ONE case "many_to_one": return DataSourceJoinRelationship.MANY_TO_ONE case "one_to_many": return DataSourceJoinRelationship.ONE_TO_MANY case "many_to_many": return DataSourceJoinRelationship.MANY_TO_MANY case _: raise NotImplementedError @attr.s(auto_attribs=True) class DataSourceJoin: data_source: "DataSource" relationship: Optional[DataSourceJoinRelationship] on_expression: Optional[str] @attr.s(frozen=True, slots=True) class DataSource: """Represents a table or view, from which Metrics may be defined. Args: name (str): Name for the Data Source. Used in sanity metric column names. from_expression (str): FROM expression - often just a fully-qualified table name. Sometimes a subquery. May contain the string ``{dataset}`` which will be replaced with an app-specific dataset for Glean apps. If the expression is templated on dataset, default_dataset is mandatory. experiments_column_type (str or None): Info about the schema of the table or view: * 'simple': There is an ``experiments`` column, which is an (experiment_slug:str -> branch_name:str) map. * 'native': There is an ``experiments`` column, which is an (experiment_slug:str -> struct) map, where the struct contains a ``branch`` field, which is the branch as a string. * None: There is no ``experiments`` column, so skip the sanity checks that rely on it. We'll also be unable to filter out pre-enrollment data from day 0 in the experiment. client_id_column (str, optional): Name of the column that contains the ``client_id`` (join key). Defaults to 'client_id'. submission_date_column (str, optional): Name of the column that contains the submission date (as a date, not timestamp). Defaults to 'submission_date'. default_dataset (str, optional): The value to use for `{dataset}` in from_expr if a value is not provided at runtime. Mandatory if from_expr contains a `{dataset}` parameter. build_id_column (str, optional): Default 'SAFE.SUBSTR(application.build_id, 0, 8)'. friendly_name (str, optional) description (str, optional) joins (list[DataSourceJoin], optional) columns_as_dimensions (bool, optional): Default false. analysis_units (list[AnalysisUnit], optional): denotes which aggregations are supported by this data_source. At time of writing, this means 'client_id', 'profile_group_id', or both. Defaults to both ['client_id', 'profile_group_id']. group_id_column (str, optional): Name of the column that contains the ``profile_group_id`` (join key). Defaults to 'profile_group_id'. """ name = attr.ib(validator=attr.validators.instance_of(str)) from_expression = attr.ib(validator=attr.validators.instance_of(str)) experiments_column_type = attr.ib(default="simple", type=str) client_id_column = attr.ib(default=AnalysisUnit.CLIENT.value, type=str) submission_date_column = attr.ib(default="submission_date", type=str) default_dataset = attr.ib(default=None, type=Optional[str]) build_id_column = attr.ib(default="SAFE.SUBSTR(application.build_id, 0, 8)", type=str) friendly_name = attr.ib(default=None, type=str) description = attr.ib(default=None, type=str) joins = attr.ib(default=None, type=List[DataSourceJoin]) columns_as_dimensions = attr.ib(default=False, type=bool) analysis_units = attr.ib( default=[AnalysisUnit.CLIENT, AnalysisUnit.PROFILE_GROUP], type=List[AnalysisUnit] ) group_id_column = attr.ib(default=AnalysisUnit.PROFILE_GROUP.value, type=str) EXPERIMENT_COLUMN_TYPES = (None, "simple", "native", "glean") @experiments_column_type.validator def _check_experiments_column_type(self, attribute, value): if value not in self.EXPERIMENT_COLUMN_TYPES: raise ValueError( f"experiments_column_type {repr(value)} must be one of: " f"{repr(self.EXPERIMENT_COLUMN_TYPES)}" ) @default_dataset.validator def _check_default_dataset_provided_if_needed(self, attribute, value): self.from_expr_for(None) def from_expr_for(self, dataset: Optional[str]) -> str: """Expands the ``from_expression`` template for the given dataset. If ``from_expression`` is not a template, returns ``from_expression``. Args: dataset (str or None): Dataset name to substitute into the from expression. """ effective_dataset = dataset or self.default_dataset if effective_dataset is None: try: return self.from_expression.format() except Exception as e: raise ValueError( f"{self.name}: from_expression contains a dataset template but no value was provided." # noqa:E501 ) from e return self.from_expression.format(dataset=effective_dataset) @attr.s(auto_attribs=True) class DataSourceReference: name: str def resolve( self, spec: "DefinitionSpecSub", conf: Union["ExperimentConfiguration", "ProjectConfiguration"], configs: "ConfigCollection", ) -> DataSource: if self.name in spec.data_sources.definitions: return spec.data_sources.definitions[self.name].resolve(spec, conf, configs) data_source_definition = configs.get_data_source_definition(self.name, conf.app_name) if data_source_definition is None: raise DefinitionNotFound(f"No default definition for data source '{self.name}' found") return data_source_definition.resolve(spec, conf, configs) converter.register_structure_hook( DataSourceReference, lambda obj, _type: DataSourceReference(name=obj) ) @attr.s(auto_attribs=True) class DataSourceDefinition: """Describes the interface for defining a data source in configuration.""" name: str # implicit in configuration from_expression: Optional[str] = None experiments_column_type: Optional[str] = None client_id_column: Optional[str] = None submission_date_column: Optional[str] = None default_dataset: Optional[str] = None build_id_column: Optional[str] = None friendly_name: Optional[str] = None description: Optional[str] = None joins: Optional[Dict[str, Dict[str, Any]]] = None columns_as_dimensions: Optional[bool] = None analysis_units: Optional[list[AnalysisUnit]] = None group_id_column: Optional[str] = None def resolve( self, spec: "DefinitionSpecSub", conf: Union["ExperimentConfiguration", "ProjectConfiguration"], configs: "ConfigCollection", ) -> DataSource: if not is_valid_slug(self.name): # a data source name cannot include a wildcard * because if # it does at this point in the code, # that means it isn't defined anywhere and there's some dangling wildcard raise ValueError( f"Invalid identifier found in name {self.name}. " + "Name must at least consist of one character, number or underscore. " + "Wildcard characters are only allowed if matching slug is defined." ) params: Dict[str, Any] = { "name": self.name, "from_expression": self.from_expression, } # Allow mozanalysis to infer defaults for these values: for k in ( "experiments_column_type", "client_id_column", "submission_date_column", "default_dataset", "build_id_column", "friendly_name", "description", "columns_as_dimensions", "analysis_units", "group_id_column", ): v = getattr(self, k) if v: params[k] = v # experiments_column_type is a little special, though! # `None` is a valid value, which means there isn't any `experiments` column in the # data source, so mozanalysis shouldn't try to use it. # But mozanalysis has a different default value for that param ("simple"), and # TOML can't represent an explicit null. So we'll look for the string "none" and # transform it to the value None. if (self.experiments_column_type or "").lower() == "none": params["experiments_column_type"] = None # resolve the data source joins if self.joins and len(self.joins) > 0: params["joins"] = [ DataSourceJoin( data_source=DataSourceReference(name=data_source).resolve(spec, conf, configs), relationship=( DataSourceJoinRelationship.from_str(join["relationship"]) if "relationship" in join else None ), on_expression=join.get("on_expression", None), ) for data_source, join in self.joins.items() ] return DataSource(**params) def merge(self, other: "DataSourceDefinition"): """Merge with another data source definition.""" for key in attr.fields_dict(type(self)): if key != "name": setattr(self, key, getattr(other, key) or getattr(self, key)) if key == "joins": if getattr(other, key) is not None: setattr(self, key, getattr(other, key)) @attr.s(auto_attribs=True) class DataSourcesSpec: """Holds data source definitions. This doesn't have a resolve() method to produce a concrete DataSourcesConfiguration because it's just a container for the definitions, and we don't need it after the spec phase. """ definitions: Dict[str, DataSourceDefinition] = attr.Factory(dict) @classmethod def from_dict(cls, d: dict) -> "DataSourcesSpec": definitions = { k: converter.structure( {"name": k, **dict((kk.lower(), vv) for kk, vv in v.items())}, DataSourceDefinition, ) for k, v in d.items() } return cls(definitions) def merge(self, other: "DataSourcesSpec"): """ Merge another datasource spec into the current one. The `other` DataSourcesSpec overwrites existing keys. """ seen = set() for key, _ in self.definitions.items(): for other_key in other.definitions: # support wildcard characters in `other` other_key_regex = re.compile(fnmatch.translate(other_key)) if other_key_regex.fullmatch(key): self.definitions[key].merge(other.definitions[other_key]) seen.add(other_key) seen.add(key) for key, definition in other.definitions.items(): if key not in seen and is_valid_slug(key): self.definitions[key] = definition converter.register_structure_hook( DataSourcesSpec, lambda obj, _type: DataSourcesSpec.from_dict(obj) )