lib/metric-config-parser/metric_config_parser/metric.py (365 lines of code) (raw):
import copy
import fnmatch
import re
from collections import defaultdict
from enum import Enum
from textwrap import dedent
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import attr
import jinja2
from mozilla_nimbus_schemas.jetstream import AnalysisBasis
from metric_config_parser.errors import DefinitionNotFound
if TYPE_CHECKING:
from .analysis import AnalysisSpec
from .config import ConfigCollection
from .experiment import ExperimentConfiguration
from .definition import DefinitionSpecSub
from .project import ProjectConfiguration
from . import AnalysisUnit
from .data_source import DataSource, DataSourceReference
from .parameter import ParameterDefinition
from .pre_treatment import PreTreatmentReference
from .statistic import Statistic
from .util import converter, is_valid_slug
class AnalysisPeriod(Enum):
DAY = "day"
WEEK = "week"
DAYS_28 = "days28"
OVERALL = "overall"
PREENROLLMENT_WEEK = "preenrollment_week"
PREENROLLMENT_DAYS_28 = "preenrollment_days28"
@property
def mozanalysis_label(self) -> str:
d = {
"day": "daily",
"week": "weekly",
"days28": "28_day",
"overall": "overall",
"preenrollment_week": "preenrollment_weekly",
"preenrollment_days28": "preenrollment_days28",
}
return d[self.value]
@property
def table_suffix(self) -> str:
d = {
"day": "daily",
"week": "weekly",
"days28": "days28",
"overall": "overall",
"preenrollment_week": "preenrollment_weekly",
"preenrollment_days28": "preenrollment_days28",
}
return d[self.value]
class MetricLevel(Enum):
GOLD = "gold"
SILVER = "silver"
BRONZE = "bronze"
@attr.s(auto_attribs=True)
class Summary:
"""Represents a metric with a statistical treatment."""
metric: "Metric"
statistic: "Statistic"
pre_treatments: List[PreTreatmentReference] = attr.Factory(list)
@attr.s(auto_attribs=True, frozen=True, slots=True)
class Metric:
"""
Metric representation.
Metrics are supersets of mozanalysis metrics with additional
metadata required for analysis.
"""
name: str
data_source: Optional[DataSource]
select_expression: Optional[str]
friendly_name: Optional[str] = None
description: Optional[str] = None
bigger_is_better: bool = True
analysis_bases: List[AnalysisBasis] = [
AnalysisBasis.ENROLLMENTS,
AnalysisBasis.EXPOSURES,
]
type: str = "scalar"
category: Optional[str] = None
depends_on: Optional[List[Summary]] = None
owner: Optional[List[str]] = None
deprecated: bool = False
level: Optional[MetricLevel] = None
analysis_units: List[AnalysisUnit] = [AnalysisUnit.CLIENT, AnalysisUnit.PROFILE_GROUP]
@attr.s(auto_attribs=True)
class MetricReference:
name: str
def resolve(
self,
spec: "AnalysisSpec",
conf: Union["ExperimentConfiguration", "ProjectConfiguration"],
configs: "ConfigCollection",
) -> List[Summary]:
if self.name in spec.metrics.definitions:
return spec.metrics.definitions[self.name].resolve(spec, conf, configs)
metric_definition = configs.get_metric_definition(self.name, conf.app_name)
if metric_definition:
return metric_definition.resolve(spec, conf, configs=configs)
raise DefinitionNotFound(f"Could not locate metric {self.name}")
# These are bare strings in the configuration file.
converter.register_structure_hook(MetricReference, lambda obj, _type: MetricReference(name=obj))
converter.register_structure_hook(Union[str, List[str], None], lambda obj, _type: obj)
@attr.s(auto_attribs=True)
class MetricDefinition:
"""Describes the interface for defining a metric in configuration.
The `select_expression` of the metric may use Jinja2 template syntax to refer to the
aggregation helper functions defined in `mozanalysis.metrics`, like
'{{agg_any("payload.processes.scalars.some_boolean_thing")}}'
"""
name: str # implicit in configuration
statistics: Optional[Dict[str, Dict[str, Any]]] = None
select_expression: Optional[str] = None
data_source: Optional[DataSourceReference] = None
friendly_name: Optional[str] = None
description: Optional[str] = None
bigger_is_better: bool = True
analysis_bases: Optional[List[AnalysisBasis]] = None
type: Optional[str] = None
category: Optional[str] = None
depends_on: Optional[List[MetricReference]] = None
owner: Optional[Union[str, List[str]]] = None
deprecated: bool = False
level: Optional[MetricLevel] = None
analysis_units: Optional[List[AnalysisUnit]] = None
@staticmethod
def generate_select_expression(
param_definitions: Dict[str, ParameterDefinition],
select_expr_template: Union[str, jinja2.nodes.Template],
configs: "ConfigCollection",
) -> str:
"""
Takes in param configuration and converts it to a select statement string
"""
if "parameters" not in str(select_expr_template):
return configs.get_env().from_string(select_expr_template).render()
formatted_params: Dict[str, Any] = defaultdict()
for param_name, param_definition in param_definitions.items():
if param_definition.distinct_by_branch and isinstance(param_definition.value, dict):
formatted_params.update(
{
param_name: "CASE e.branch "
+ " ".join(
[
f'WHEN "{branch}" THEN "{value}"'
for branch, value in param_definition.value.items()
]
)
+ " END"
}
)
else:
formatted_params.update({param_name: param_definition.value})
return (
configs.get_env().from_string(select_expr_template).render(parameters=formatted_params)
)
def resolve(
self,
spec: "DefinitionSpecSub",
conf: Union["ExperimentConfiguration", "ProjectConfiguration"],
configs: "ConfigCollection",
) -> List[Summary]:
metric_summary = None
metric = None
upstream_metrics = None
# check if metric depends on other metrics
if self.depends_on:
upstream_metrics = []
# resolve upstream metrics
for metric_ref in self.depends_on:
# check if upstream metric is defined externally as a "definition"
upstream_metric = configs.get_metric_definition(metric_ref.name, conf.app_name)
if upstream_metric is None:
# check if upstream metric is part of the analysis spec
upstream_metric = spec.metrics.definitions.get(metric_ref.name, None)
if upstream_metric is None:
raise DefinitionNotFound(
f"No definition found for referenced upstream metric {metric_ref}"
)
upstream_metrics += upstream_metric.resolve(spec, conf, configs)
if self.select_expression is None or self.data_source is None:
# checks if a metric from mozanalysis was referenced
metric_definition = configs.get_metric_definition(self.name, conf.app_name)
if metric_definition is None and upstream_metrics is None:
raise DefinitionNotFound(
f"No default definition found for referenced metric {self.name}"
)
elif upstream_metrics:
metric = Metric(
name=self.name,
data_source=None,
select_expression=None,
friendly_name=(
dedent(self.friendly_name) if self.friendly_name else self.friendly_name
),
description=(
dedent(self.description) if self.description else self.description
),
bigger_is_better=self.bigger_is_better,
analysis_bases=self.analysis_bases
or [AnalysisBasis.ENROLLMENTS, AnalysisBasis.EXPOSURES],
type=self.type or "scalar",
category=self.category,
depends_on=upstream_metrics,
owner=[self.owner] if isinstance(self.owner, str) else self.owner,
deprecated=self.deprecated,
level=self.level,
analysis_units=self.analysis_units
or [AnalysisUnit.CLIENT, AnalysisUnit.PROFILE_GROUP],
)
elif metric_definition:
metric_definition.analysis_bases = (
self.analysis_bases
or metric_definition.analysis_bases
or [
AnalysisBasis.ENROLLMENTS,
AnalysisBasis.EXPOSURES,
]
)
metric_definition.statistics = self.statistics
metric_definition.analysis_units = (
self.analysis_units
or metric_definition.analysis_units
or [AnalysisUnit.CLIENT, AnalysisUnit.PROFILE_GROUP]
)
metric_summary = metric_definition.resolve(spec, conf, configs)
else:
select_expression = self.generate_select_expression(
spec.parameters.definitions,
select_expr_template=self.select_expression,
configs=configs,
)
# ensure all of metric's analysis_units are supported by data_source
resolved_ds = self.data_source.resolve(spec, conf, configs)
analysis_units = self.analysis_units or [
AnalysisUnit.CLIENT,
AnalysisUnit.PROFILE_GROUP,
]
for agg_unit in analysis_units:
if agg_unit not in resolved_ds.analysis_units:
raise ValueError(
f"data_source {resolved_ds.name} does not support "
f"all analysis_units specified by metric {self.name}: "
f"analysis_units for metric: {analysis_units}, "
f"analysis_units for data_source: {resolved_ds.analysis_units}"
)
metric = Metric(
name=self.name,
data_source=resolved_ds,
select_expression=select_expression,
friendly_name=(
dedent(self.friendly_name) if self.friendly_name else self.friendly_name
),
description=(dedent(self.description) if self.description else self.description),
bigger_is_better=self.bigger_is_better,
analysis_bases=self.analysis_bases
or [AnalysisBasis.ENROLLMENTS, AnalysisBasis.EXPOSURES],
type=self.type or "scalar",
category=self.category,
depends_on=upstream_metrics,
owner=[self.owner] if isinstance(self.owner, str) else self.owner,
deprecated=self.deprecated,
level=self.level,
analysis_units=analysis_units,
)
metrics_with_treatments = []
if metric_summary:
if self.statistics:
for statistic_name, params in self.statistics.items():
stats_params = copy.deepcopy(params)
pre_treatments = []
for pt in stats_params.pop("pre_treatments", []):
if isinstance(pt, str):
ref = PreTreatmentReference(pt, {})
else:
name = pt.pop("name")
ref = PreTreatmentReference(name, pt)
pre_treatments.append(ref.resolve(spec))
metrics_with_treatments.append(
Summary(
metric=metric_summary[0].metric,
statistic=Statistic(statistic_name, stats_params),
pre_treatments=pre_treatments,
)
)
else:
metrics_with_treatments += metric_summary
elif metric:
if self.statistics is None:
raise ValueError(f"No statistical treatment defined for metric '{self.name}'")
for statistic_name, params in self.statistics.items():
stats_params = copy.deepcopy(params)
pre_treatments = []
for pt in stats_params.pop("pre_treatments", []):
if isinstance(pt, str):
ref = PreTreatmentReference(pt, {})
else:
name = pt.pop("name")
ref = PreTreatmentReference(name, pt)
pre_treatments.append(ref.resolve(spec))
metrics_with_treatments.append(
Summary(
metric=metric,
statistic=Statistic(statistic_name, stats_params),
pre_treatments=pre_treatments,
)
)
if len(metrics_with_treatments) == 0:
raise ValueError(f"Metric {self.name} has no statistical treatment defined.")
return metrics_with_treatments
def merge(self, other: "MetricDefinition"):
"""Merge with another metric definition."""
for key in attr.fields_dict(type(self)):
setattr(self, key, getattr(other, key) or getattr(self, key))
MetricsConfigurationType = Dict[AnalysisPeriod, List[Summary]]
@attr.s(auto_attribs=True)
class MetricsSpec:
"""Describes the interface for the metrics section in configuration."""
daily: List[MetricReference] = attr.Factory(list)
weekly: List[MetricReference] = attr.Factory(list)
days28: List[MetricReference] = attr.Factory(list)
overall: List[MetricReference] = attr.Factory(list)
preenrollment_weekly: List[MetricReference] = attr.Factory(list)
preenrollment_days28: List[MetricReference] = attr.Factory(list)
definitions: Dict[str, MetricDefinition] = attr.Factory(dict)
@classmethod
def from_dict(cls, d: dict) -> "MetricsSpec":
params: Dict[str, Any] = {}
known_keys = {f.name for f in attr.fields(cls)}
for k in known_keys:
if k == "days28":
v = d.get("28_day", [])
else:
v = d.get(k, [])
if not isinstance(v, list):
raise ValueError(f"metrics.{k} should be a list of metrics")
params[k] = [MetricReference(m) for m in v]
params["definitions"] = {
k: converter.structure(
{"name": k, **dict((kk.lower(), vv) for kk, vv in v.items())},
MetricDefinition,
)
for k, v in d.items()
if k not in known_keys and k != "28_day"
}
return cls(**params)
def resolve(
self,
spec: "AnalysisSpec",
conf: Union["ExperimentConfiguration", "ProjectConfiguration"],
configs: "ConfigCollection",
) -> MetricsConfigurationType:
result = {}
for period in AnalysisPeriod:
summaries = [
summary
for ref in getattr(self, period.table_suffix)
for summary in ref.resolve(spec, conf, configs)
]
unique_summaries = []
seen_summaries = set()
# summaries needs to be reversed to make sure merged configs overwrite existing ones
summaries.reverse()
for summary in summaries:
if (summary.metric.name, summary.statistic.name) not in seen_summaries:
seen_summaries.add((summary.metric.name, summary.statistic.name))
unique_summaries.append(summary)
result[period] = unique_summaries
return result
def merge(self, other: "MetricsSpec"):
"""
Merges another metrics spec into the current one.
The `other` MetricsSpec overwrites existing metrics.
"""
self.daily = other.daily + self.daily
self.weekly = other.weekly + self.weekly
self.days28 = other.days28 + self.days28
self.overall = other.overall + self.overall
self.preenrollment_weekly = other.preenrollment_weekly + self.preenrollment_weekly
self.preenrollment_days28 = other.preenrollment_days28 + self.preenrollment_days28
seen = set()
for key, _ in self.definitions.items():
for other_key in other.definitions:
# support wildcard characters in `other`
other_key_regex = re.compile(fnmatch.translate(other_key))
if other_key_regex.fullmatch(key):
self.definitions[key].merge(other.definitions[other_key])
seen.add(other_key)
seen.add(key)
for key, definition in other.definitions.items():
if key not in seen and is_valid_slug(key):
self.definitions[key] = definition
converter.register_structure_hook(MetricsSpec, lambda obj, _type: MetricsSpec.from_dict(obj))