in experimenter/experimenter/jetstream/client.py [0:0]
def get_experiment_data(experiment: NimbusExperiment):
recipe_slug = experiment.slug.replace("-", "_")
# we don't use DAILY results in Experimenter, so only get WEEKLY/OVERALL
windows = [AnalysisWindow.WEEKLY, AnalysisWindow.OVERALL]
raw_data = {
AnalysisWindow.WEEKLY: {},
AnalysisWindow.OVERALL: {},
}
runtime_errors = []
experiment_metadata = None
try:
experiment_metadata = get_metadata(recipe_slug)
except RuntimeError as e:
runtime_errors.append(str(e))
outcomes_metadata = (
experiment_metadata.get("outcomes") if experiment_metadata is not None else None
)
experiment_errors = None
try:
experiment_errors = get_analysis_errors(recipe_slug)
except RuntimeError as e:
runtime_errors.append(str(e))
experiment_data = {
"show_analysis": settings.FEATURE_ANALYSIS,
"metadata": experiment_metadata,
}
for window in windows:
experiment_data[window] = {}
data_from_jetstream = []
try:
data_from_jetstream = get_data(recipe_slug, window)
except RuntimeError as e:
runtime_errors.append(str(e))
segment_points_enrollments = defaultdict(list)
segment_points_exposures = defaultdict(list)
for point in data_from_jetstream:
segment_key = point["segment"]
if point["analysis_basis"] == AnalysisBasis.ENROLLMENTS:
segment_points_enrollments[segment_key].append(point)
experiment_data[window][AnalysisBasis.ENROLLMENTS] = {}
raw_data[window][AnalysisBasis.ENROLLMENTS] = {}
elif point["analysis_basis"] == AnalysisBasis.EXPOSURES:
segment_points_exposures[segment_key].append(point)
experiment_data[window][AnalysisBasis.EXPOSURES] = {}
raw_data[window][AnalysisBasis.EXPOSURES] = {}
for segment, segment_data in segment_points_enrollments.items():
data = raw_data[window][AnalysisBasis.ENROLLMENTS][segment] = JetstreamData(
segment_data
)
(
result_metrics,
primary_metrics_set,
other_metrics,
) = get_results_metrics_map(
data,
experiment.primary_outcomes,
experiment.secondary_outcomes,
outcomes_metadata,
)
if data and window == AnalysisWindow.OVERALL:
# Append some values onto the incoming Jetstream data
data.append_population_percentages()
data.append_retention_data(
raw_data[AnalysisWindow.WEEKLY][AnalysisBasis.ENROLLMENTS][segment]
)
# Create the output object (overall data)
ResultsObjectModel = create_results_object_model(data)
data = ResultsObjectModel(result_metrics, data, experiment)
data.append_conversion_count(primary_metrics_set)
if segment == Segment.ALL:
experiment_data["other_metrics"] = other_metrics
elif data and window == AnalysisWindow.WEEKLY:
# Create the output object (weekly data)
ResultsObjectModel = create_results_object_model(data)
data = ResultsObjectModel(result_metrics, data, experiment, window)
# Convert output object to dict and put into the final object
transformed_data = data.model_dump(exclude_none=True) or None
experiment_data[window][AnalysisBasis.ENROLLMENTS][segment] = transformed_data
for segment, segment_data in segment_points_exposures.items():
data = raw_data[window][AnalysisBasis.EXPOSURES][segment] = JetstreamData(
segment_data
)
(
result_metrics,
primary_metrics_set,
other_metrics,
) = get_results_metrics_map(
data,
experiment.primary_outcomes,
experiment.secondary_outcomes,
outcomes_metadata,
)
if data and window == AnalysisWindow.OVERALL:
# Append some values onto Jetstream data
data.append_population_percentages()
data.append_retention_data(
raw_data[AnalysisWindow.WEEKLY][AnalysisBasis.EXPOSURES][segment]
)
ResultsObjectModel = create_results_object_model(data)
data = ResultsObjectModel(result_metrics, data, experiment)
data.append_conversion_count(primary_metrics_set)
elif data and window == AnalysisWindow.WEEKLY:
ResultsObjectModel = create_results_object_model(data)
data = ResultsObjectModel(result_metrics, data, experiment, window)
transformed_data = data.model_dump(exclude_none=True) or None
experiment_data[window][AnalysisBasis.EXPOSURES][segment] = transformed_data
errors_by_metric = {}
errors_experiment_overall = []
if experiment_errors is not None:
for err in experiment_errors:
metric_slug = err.get("metric")
if "metric" in err and metric_slug is not None:
if metric_slug not in errors_by_metric:
errors_by_metric[metric_slug] = []
errors_by_metric[metric_slug].append(err)
else:
try:
analysis_start_time = datetime.fromisoformat(
experiment_metadata.get("analysis_start_time")
if experiment_metadata is not None
else ""
)
timestamp = datetime.fromisoformat(err.get("timestamp"))
if timestamp >= analysis_start_time:
errors_experiment_overall.append(err)
except (ValueError, TypeError, KeyError):
# ill-formatted/missing timestamp: default to including the error
errors_experiment_overall.append(err)
for e in runtime_errors:
# only store runtime errors for overall window if we expect those results
# (and lag by one day so there is time for analysis to complete)
if "overall.json" not in e or (
"overall.json" in e
and experiment.end_date
and experiment.end_date < (date.today() - timedelta(days=1))
):
analysis_error = AnalysisError(
experiment=experiment.slug,
filename="experimenter/jetstream/client.py",
func_name="load_data_from_gcs",
log_level="WARNING",
message=e,
timestamp=timezone.now(),
)
errors_experiment_overall.append(analysis_error.model_dump())
errors_by_metric["experiment"] = errors_experiment_overall
experiment_data["errors"] = errors_by_metric
return {
"v3": experiment_data,
}