lib/metric-config-parser/metric_config_parser/segment.py (142 lines of code) (raw):

from textwrap import dedent from typing import TYPE_CHECKING, Any, Dict, Optional import attr import jinja2 from jinja2 import StrictUndefined if TYPE_CHECKING: from .config import ConfigCollection from .analysis import AnalysisSpec from .experiment import ExperimentConfiguration from . import AnalysisUnit from .errors import DefinitionNotFound from .util import converter @attr.s(frozen=True, slots=True) class SegmentDataSource: """Represents a table or view, from which segments may be defined. ``window_start`` and ``window_end`` define the window of data used to determine whether each client fits a segment. Ideally this window ends at/before the moment of enrollment, so that user's branches can't bias the segment assignment. ``window_start`` and ``window_end`` are integers, representing the number of days before or after enrollment. Args: name (str): Name for the Data Source. Should be unique to avoid confusion. from_expression (str): FROM expression - often just a fully-qualified table name. Sometimes a subquery. May contain the string ``{dataset}`` which will be replaced with an app-specific dataset for Glean apps. If the expression is templated on dataset, default_dataset is mandatory. window_start (int, optional): See above. window_end (int, optional): See above. client_id_column (str, optional): Name of the column that contains the ``client_id`` (join key). Defaults to 'client_id'. submission_date_column (str, optional): Name of the column that contains the submission date (as a date, not timestamp). Defaults to 'submission_date'. default_dataset (str, optional): The value to use for `{dataset}` in from_expression if a value is not provided at runtime. Mandatory if from_expression contains a `{dataset}` parameter. group_id_column (str, optional): Name of the column that contains the ``profile_group_id`` (join key). Defaults to 'profile_group_id'. """ name = attr.ib(validator=attr.validators.instance_of(str)) from_expression = attr.ib(validator=attr.validators.instance_of(str)) window_start = attr.ib(default=0, type=int) window_end = attr.ib(default=0, type=int) client_id_column = attr.ib(default=AnalysisUnit.CLIENT.value, type=str) submission_date_column = attr.ib(default="submission_date", type=str) default_dataset = attr.ib(default=None, type=Optional[str]) group_id_column = attr.ib(default=AnalysisUnit.PROFILE_GROUP.value, type=str) @attr.s(frozen=True, slots=True) class Segment: """Represents an experiment Segment. Args: name (str): The segment's name; will be a column name. data_source (SegmentDataSource): Data source that provides the columns referenced in ``select_expression``. select_expression (str): A SQL select expression that includes an aggregation function (we ``GROUP BY client_id``). Returns a non-NULL ``BOOL``: ``True`` if the user is in the segment, ``False`` otherwise. friendly_name (str): A human-readable dashboard title for this segment description (str): A paragraph of Markdown-formatted text describing the segment in more detail, to be shown on dashboards """ name = attr.ib(type=str) data_source = attr.ib(validator=attr.validators.instance_of(SegmentDataSource)) select_expression = attr.ib(type=str) friendly_name = attr.ib(type=Optional[str], default=None) description = attr.ib(type=Optional[str], default=None) @attr.s(auto_attribs=True) class SegmentReference: name: str def resolve( self, spec: "AnalysisSpec", conf: "ExperimentConfiguration", configs: "ConfigCollection", ) -> Segment: if self.name in spec.segments.definitions: return spec.segments.definitions[self.name].resolve(spec, conf, configs) segment_definition = configs.get_segment_definition(self.name, conf.app_name) if segment_definition is None: raise DefinitionNotFound(f"Could not find definition for segment '{self.name}'") return segment_definition.resolve(spec, conf, configs) converter.register_structure_hook(SegmentReference, lambda obj, _type: SegmentReference(name=obj)) @attr.s(auto_attribs=True) class SegmentDataSourceDefinition: name: str from_expression: str window_start: int = 0 window_end: int = 0 client_id_column: Optional[str] = AnalysisUnit.CLIENT.value submission_date_column: Optional[str] = "submission_date" default_dataset: Optional[str] = None group_id_column: Optional[str] = AnalysisUnit.PROFILE_GROUP.value def resolve( self, spec: "AnalysisSpec", conf: "ExperimentConfiguration", _configs: "ConfigCollection", ) -> SegmentDataSource: env = jinja2.Environment(autoescape=False, undefined=StrictUndefined) from_expression = env.from_string(self.from_expression).render(experiment=conf) kwargs: Dict[str, Any] = { "name": self.name, "from_expression": from_expression, "window_start": self.window_start, "window_end": self.window_end, } for k in ("client_id_column", "submission_date_column", "group_id_column"): v = getattr(self, k) if v: kwargs[k] = v return SegmentDataSource(**kwargs) @attr.s(auto_attribs=True) class SegmentDataSourceReference: name: str def resolve( self, spec: "AnalysisSpec", conf: "ExperimentConfiguration", configs: "ConfigCollection", ) -> SegmentDataSource: if self.name in spec.segments.data_sources: return spec.segments.data_sources[self.name].resolve(spec, conf, configs) segment_definition = configs.get_segment_data_source_definition(self.name, conf.app_name) if segment_definition is None: raise DefinitionNotFound( f"Could not find definition for segment data source '{self.name}'" ) return segment_definition.resolve(spec, conf, configs) converter.register_structure_hook( SegmentDataSourceReference, lambda obj, _type: SegmentDataSourceReference(name=obj) ) @attr.s(auto_attribs=True) class SegmentDefinition: name: str data_source: SegmentDataSourceReference select_expression: str friendly_name: Optional[str] = None description: Optional[str] = None def resolve( self, spec: "AnalysisSpec", conf: "ExperimentConfiguration", configs: "ConfigCollection", ) -> Segment: data_source = self.data_source.resolve(spec, conf, configs) return Segment( name=self.name, data_source=data_source, select_expression=configs.get_env().from_string(self.select_expression).render(), friendly_name=( dedent(self.friendly_name) if self.friendly_name else self.friendly_name ), description=(dedent(self.description) if self.description else self.description), ) @attr.s(auto_attribs=True) class SegmentsSpec: definitions: Dict[str, SegmentDefinition] = attr.Factory(dict) data_sources: Dict[str, SegmentDataSourceDefinition] = attr.Factory(dict) @classmethod def from_dict(cls, d: dict) -> "SegmentsSpec": data_sources = { k: converter.structure( {"name": k, **dict((kk.lower(), vv) for kk, vv in v.items())}, SegmentDataSourceDefinition, ) for k, v in d.pop("data_sources", {}).items() } definitions = { k: converter.structure( {"name": k, **dict((kk.lower(), vv) for kk, vv in v.items())}, SegmentDefinition, ) for k, v in d.items() } return cls(definitions, data_sources) def merge(self, other: "SegmentsSpec"): """ Merge another segments spec into the current one. The `other` SegmentsSpec overwrites existing keys. """ self.data_sources.update(other.data_sources) self.definitions.update(other.definitions) converter.register_structure_hook(SegmentsSpec, lambda obj, _type: SegmentsSpec.from_dict(obj))