lib/metric-config-parser/metric_config_parser/experiment.py (214 lines of code) (raw):
import datetime as dt
import enum
from typing import TYPE_CHECKING, Any, List, Optional
import attr
import jinja2
from jinja2 import StrictUndefined
from mozilla_nimbus_schemas import RandomizationUnit
if TYPE_CHECKING:
from .config import ConfigCollection
from .analysis import AnalysisSpec
from . import AnalysisUnit
from .errors import NoEndDateException, NoStartDateException
from .exposure_signal import ExposureSignal, ExposureSignalDefinition
from .segment import Segment, SegmentReference
from .util import parse_date
class Channel(enum.Enum):
"""Release channel."""
NIGHTLY = "nightly"
BETA = "beta"
RELEASE = "release"
@classmethod
def has_value(cls, value: str) -> bool:
"""Check if a specific value is represented by the enum."""
return value in cls._value2member_map_ # type: ignore
@attr.s(auto_attribs=True, kw_only=True, slots=True, frozen=True)
class Branch:
slug: str
ratio: int
@attr.s(auto_attribs=True, kw_only=True, slots=True, frozen=True)
class BucketConfig:
randomization_unit: str
namespace: str
start: int
count: int
total: int = 10000
@attr.s(auto_attribs=True, kw_only=True, slots=True, frozen=True)
class Experiment:
"""
Common experiment representation.
Attributes:
experimenter_slug: Slug generated by Experimenter for V1 experiments;
None for V6 experiments
normandy_slug: V1 experiment normandy_slug; V6 experiment slug
type: V1 experiment type; always "v6" for V6 experiments
status: V1 experiment status; "Live" for active V6 experiments,
"Complete" for V6 experiments with endDate in the past
branches: V1 experiment variants converted to branches; V6 experiment branches
start_date: experiment start_date
end_date: experiment end_date
proposed_enrollment: experiment proposed_enrollment
reference_branch: V1 experiment branch slug where is_control is True;
V6 experiment reference_branch
enrollment_end_date: experiment enrollment_end_date
is_enrollment_paused: True if enrollment has ended;
needed because enrollment_end_date may be computed/proposed
"""
experimenter_slug: Optional[str]
normandy_slug: Optional[str]
type: str
status: Optional[str]
branches: List[Branch]
start_date: Optional[dt.datetime]
end_date: Optional[dt.datetime]
proposed_enrollment: Optional[int]
reference_branch: Optional[str]
is_high_population: bool
app_name: str
bucket_config: Optional[BucketConfig] = None
is_enrollment_paused: Optional[bool] = None
app_id: Optional[str] = None
outcomes: List[str] = attr.Factory(list)
segments: List[str] = attr.Factory(list)
enrollment_end_date: Optional[dt.datetime] = None
boolean_pref: Optional[str] = None
channel: Optional[Channel] = None
is_rollout: bool = False
@attr.s(auto_attribs=True)
class ExperimentConfiguration:
"""Represents the configuration of an experiment for analysis."""
experiment_spec: "ExperimentSpec"
experiment: "Experiment"
segments: List[Segment]
exposure_signal: Optional[ExposureSignal] = None
# int <= 100 represents the percentage of clients for downsampling enrollments
sample_size: Optional[int] = None
def __attrs_post_init__(self):
# Catch any exceptions at instantiation
self._enrollment_query = self.enrollment_query
@property
def enrollment_query(self) -> Optional[str]:
if self.experiment_spec.enrollment_query is None:
return None
cached = getattr(self, "_enrollment_query", None)
if cached:
return cached
class ExperimentProxy:
@property
def enrollment_query(proxy):
raise ValueError()
def __getattr__(proxy, name):
return getattr(self, name)
env = jinja2.Environment(autoescape=False, undefined=StrictUndefined)
return env.from_string(self.experiment_spec.enrollment_query).render(
experiment=ExperimentProxy()
)
@property
def proposed_enrollment(self) -> int:
return self.experiment_spec.enrollment_period or self.experiment.proposed_enrollment or 0
@property
def enrollment_end_date(self) -> Optional[dt.datetime]:
return self.experiment.enrollment_end_date
@property
def is_enrollment_paused(self) -> Optional[bool]:
return self.experiment.is_enrollment_paused
@property
def bucket_count(self) -> Optional[int]:
if hasattr(self.experiment, "bucket_config") and self.experiment.bucket_config is not None:
return self.experiment.bucket_config.count
return None
@property
def bucket_start(self) -> Optional[int]:
if hasattr(self.experiment, "bucket_config") and self.experiment.bucket_config is not None:
return self.experiment.bucket_config.start
return None
@property
def randomization_unit(self) -> Optional[RandomizationUnit]:
if hasattr(self.experiment, "bucket_config") and self.experiment.bucket_config is not None:
# this will raise a ValueError if the provided randomization_unit is invalid
return RandomizationUnit(self.experiment.bucket_config.randomization_unit)
return None
@property
def analysis_unit(self) -> Optional[AnalysisUnit]:
"""Retrieve the appropriate analysis unit, which is
derived from the experiment's randomization unit.
"""
if self.randomization_unit and self.randomization_unit == RandomizationUnit.GROUP_ID:
return AnalysisUnit.PROFILE_GROUP
return AnalysisUnit.CLIENT
@property
def enrollment_period(self) -> int:
if self.experiment_spec.enrollment_period is not None:
return self.experiment_spec.enrollment_period
elif self.enrollment_end_date is not None and self.start_date is not None:
return (self.enrollment_end_date - self.start_date).days + 1
return self.proposed_enrollment or 0
@property
def reference_branch(self) -> Optional[str]:
return self.experiment_spec.reference_branch or self.experiment.reference_branch
@property
def start_date(self) -> Optional[dt.datetime]:
return parse_date(self.experiment_spec.start_date) or self.experiment.start_date
@property
def end_date(self) -> Optional[dt.datetime]:
return parse_date(self.experiment_spec.end_date) or self.experiment.end_date
@property
def status(self) -> Optional[str]:
"""Assert the experiment is Complete if an end date is provided.
Functionally, this lets the Overall metrics run on the specified date.
"""
return "Complete" if self.experiment_spec.end_date else self.experiment.status
# Helpers for configuration templates
@property
def start_date_str(self) -> str:
if not self.start_date:
raise NoStartDateException(self.normandy_slug)
return self.start_date.strftime("%Y-%m-%d")
@property
def end_date_str(self) -> str:
if not self.end_date:
raise NoEndDateException(self.normandy_slug)
return self.end_date.strftime("%Y-%m-%d")
@property
def last_enrollment_date_str(self) -> str:
if not self.start_date:
raise NoStartDateException(self.normandy_slug)
return (self.start_date + dt.timedelta(days=self.enrollment_period)).strftime("%Y-%m-%d")
@property
def skip(self) -> bool:
return self.experiment_spec.skip
@property
def is_private(self) -> bool:
return self.experiment_spec.is_private
@property
def app_name(self) -> str:
return self.experiment.app_name
@property
def dataset_id(self) -> Optional[str]:
return self.experiment_spec.dataset_id
def has_external_config_overrides(self) -> bool:
"""Check whether the external config overrides experiment configuration."""
return (
self.reference_branch != self.experiment.reference_branch
or self.start_date != self.experiment.start_date
or self.end_date != self.experiment.end_date
or self.proposed_enrollment != self.experiment.proposed_enrollment
or self.enrollment_end_date != self.experiment.enrollment_end_date
)
# see https://stackoverflow.com/questions/50888391/pickle-of-object-with-getattr-method-in-
# python-returns-typeerror-object-no
def __getstate__(self):
return vars(self)
def __setstate__(self, state):
vars(self).update(state)
def __getattr__(self, name: str) -> Any:
if "experiment" not in vars(self):
raise AttributeError
return getattr(self.experiment, name)
def _validate_yyyy_mm_dd(instance: Any, attribute: Any, value: Any) -> None:
parse_date(value)
def _validate_dataset_id(instance: Any, attribute, value):
if instance.is_private and value is None:
raise ValueError("dataset_id must be set to a custom dataset for private experiments")
@attr.s(auto_attribs=True, kw_only=True)
class ExperimentSpec:
"""Describes the interface for overriding experiment details."""
enrollment_query: Optional[str] = None
enrollment_period: Optional[int] = None
reference_branch: Optional[str] = None
start_date: Optional[str] = attr.ib(default=None, validator=_validate_yyyy_mm_dd)
end_date: Optional[str] = attr.ib(default=None, validator=_validate_yyyy_mm_dd)
segments: List[SegmentReference] = attr.Factory(list)
skip: bool = False
exposure_signal: Optional[ExposureSignalDefinition] = None
is_private: bool = False
dataset_id: Optional[str] = attr.ib(default=None, validator=_validate_dataset_id)
sample_size: Optional[int] = None
def resolve(
self,
spec: "AnalysisSpec",
experiment: "Experiment",
configs: "ConfigCollection",
) -> ExperimentConfiguration:
experiment_config = ExperimentConfiguration(self, experiment, [])
# Segment data sources may need to know the enrollment dates of the experiment,
# so we'll forward the Experiment we know about so far.
experiment_segments = [SegmentReference(seg) for seg in experiment.segments]
all_segments = []
for seg in self.segments + experiment_segments:
if seg not in all_segments:
all_segments.append(seg)
experiment_config.segments = [
ref.resolve(spec, experiment_config, configs) for ref in all_segments
]
experiment_config.sample_size = self.sample_size
if self.exposure_signal:
experiment_config.exposure_signal = self.exposure_signal.resolve(
spec, conf=experiment_config, configs=configs
)
return experiment_config
def merge(self, other: "ExperimentSpec") -> None:
for key in attr.fields_dict(type(self)):
setattr(self, key, getattr(other, key) or getattr(self, key))