sql/moz-fx-data-experiments/monitoring/experimenter_experiments_v1/query.py (182 lines of code) (raw):
#!/usr/bin/env python3
"""Import experiments from Experimenter via the Experimenter API."""
import datetime
import json
import sys
import time
from argparse import ArgumentParser
from typing import List, Optional
import attr
import cattrs
import pytz
import requests
from google.cloud import bigquery
# for nimbus experiments
EXPERIMENTER_API_URL_V8 = (
"https://experimenter.services.mozilla.com/api/v8/experiments/"
)
parser = ArgumentParser(description=__doc__)
parser.add_argument("--project", default="moz-fx-data-experiments")
parser.add_argument("--destination_dataset", default="monitoring")
parser.add_argument("--destination_table", default="experimenter_experiments_v1")
parser.add_argument("--sql_dir", default="sql/")
parser.add_argument("--dry_run", action="store_true")
@attr.s(auto_attribs=True)
class Branch:
"""Defines a branch."""
slug: str
ratio: int
features: Optional[dict]
@attr.s(auto_attribs=True)
class Experiment:
"""Defines an Experiment."""
experimenter_slug: Optional[str]
normandy_slug: Optional[str]
type: str
status: Optional[str]
branches: List[Branch]
start_date: Optional[datetime.datetime]
end_date: Optional[datetime.datetime]
enrollment_end_date: Optional[datetime.datetime]
proposed_enrollment: Optional[int]
reference_branch: Optional[str]
is_high_population: bool
app_name: str
app_id: str
channel: str
targeting: str
targeted_percent: float
namespace: Optional[str]
feature_ids: List[str]
@attr.s(auto_attribs=True)
class NimbusExperiment:
"""Represents a v8 Nimbus experiment from Experimenter."""
slug: str # Normandy slug
startDate: Optional[datetime.datetime]
endDate: Optional[datetime.datetime]
enrollmentEndDate: Optional[datetime.datetime]
proposedEnrollment: int
branches: List[Branch]
referenceBranch: Optional[str]
appName: str
appId: str
channel: str
targeting: str
bucketConfig: dict
featureIds: list[str]
@classmethod
def from_dict(cls, d) -> "NimbusExperiment":
"""Load an experiment from dict."""
converter = cattrs.BaseConverter()
converter.register_structure_hook(
datetime.datetime,
lambda num, _: datetime.datetime.fromisoformat(
num.replace("Z", "+00:00")
).astimezone(pytz.utc),
)
converter.register_structure_hook(
Branch,
lambda b, _: Branch(
slug=b["slug"], ratio=b["ratio"], features=b["features"]
),
)
return converter.structure(d, cls)
def to_experiment(self) -> "Experiment":
"""Convert to Experiment."""
return Experiment(
normandy_slug=self.slug,
experimenter_slug=None,
type="v6",
status=(
"Live"
if (
self.endDate is None
or (
self.endDate
and self.endDate > pytz.utc.localize(datetime.datetime.now())
)
)
else "Complete"
),
start_date=self.startDate,
end_date=self.endDate,
enrollment_end_date=self.enrollmentEndDate,
proposed_enrollment=self.proposedEnrollment,
reference_branch=self.referenceBranch,
is_high_population=False,
branches=self.branches,
app_name=self.appName,
app_id=self.appId,
channel=self.channel,
targeting=self.targeting,
targeted_percent=self.bucketConfig["count"] / self.bucketConfig["total"],
namespace=self.bucketConfig["namespace"],
feature_ids=self.featureIds,
)
def fetch(url):
"""Fetch a url."""
for _ in range(2):
try:
return requests.get(
url,
timeout=30,
headers={"user-agent": "https://github.com/mozilla/bigquery-etl"},
).json()
except Exception as e:
last_exception = e
time.sleep(1)
raise last_exception
def get_experiments() -> List[Experiment]:
"""Fetch experiments from Experimenter."""
nimbus_experiments_json = fetch(EXPERIMENTER_API_URL_V8)
nimbus_experiments = []
for experiment in nimbus_experiments_json:
try:
nimbus_experiments.append(
NimbusExperiment.from_dict(experiment).to_experiment()
)
except Exception as e:
print(f"Cannot import experiment: {experiment}: {e}")
return nimbus_experiments
def main():
"""Run."""
args = parser.parse_args()
experiments = get_experiments()
destination_table = (
f"{args.project}.{args.destination_dataset}.{args.destination_table}"
)
bq_schema = (
bigquery.SchemaField("experimenter_slug", "STRING"),
bigquery.SchemaField("normandy_slug", "STRING"),
bigquery.SchemaField("type", "STRING"),
bigquery.SchemaField("status", "STRING"),
bigquery.SchemaField("start_date", "DATE"),
bigquery.SchemaField("end_date", "DATE"),
bigquery.SchemaField("enrollment_end_date", "DATE"),
bigquery.SchemaField("proposed_enrollment", "INTEGER"),
bigquery.SchemaField("reference_branch", "STRING"),
bigquery.SchemaField("is_high_population", "BOOL"),
bigquery.SchemaField(
"branches",
"RECORD",
mode="REPEATED",
fields=[
bigquery.SchemaField("slug", "STRING"),
bigquery.SchemaField("ratio", "INTEGER"),
bigquery.SchemaField("features", "JSON"),
],
),
bigquery.SchemaField("app_id", "STRING"),
bigquery.SchemaField("app_name", "STRING"),
bigquery.SchemaField("channel", "STRING"),
bigquery.SchemaField("targeting", "STRING"),
bigquery.SchemaField("targeted_percent", "FLOAT"),
bigquery.SchemaField("namespace", "STRING"),
bigquery.SchemaField("feature_ids", "STRING", mode="REPEATED"),
)
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.job.WriteDisposition.WRITE_TRUNCATE,
)
job_config.schema = bq_schema
converter = cattrs.BaseConverter()
converter.register_unstructure_hook(
datetime.datetime, lambda d: datetime.datetime.strftime(d, format="%Y-%m-%d")
)
blob = converter.unstructure(experiments)
if args.dry_run:
print(json.dumps(blob))
sys.exit(0)
client = bigquery.Client(args.project)
client.load_table_from_json(blob, destination_table, job_config=job_config).result()
print(f"Loaded {len(experiments)} experiments")
if __name__ == "__main__":
main()