auto_sizing/size_calculation.py (130 lines of code) (raw):
import json
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import attr
from mozanalysis.bq import BigQueryContext, sanitize_table_name_for_bq
from mozanalysis.experiment import TimeLimits
from mozanalysis.frequentist_stats.sample_size import z_or_t_ind_sample_size_calc
from mozanalysis.sizing import HistoricalTarget
from pandas import DataFrame
import auto_sizing.errors as errors
from auto_sizing.export_json import export_sample_size_json
from auto_sizing.targets import SizingConfiguration
from auto_sizing.utils import delete_bq_table
@attr.s(auto_attribs=True)
class SizeCalculation:
"""Wrapper for size calculation for target recipe."""
project: str
dataset: str
bucket: str
config: SizingConfiguration
@property
def bigquerycontext(self):
return BigQueryContext(project_id=self.project, dataset_id=self.dataset)
def _validate_requested_timelimits(self, current_date: datetime) -> Optional[TimeLimits]:
"""
Checks if requested dates of data are available and not in the future.
Returns a TimeLimits instance if possible; else, returns None.
"""
last_date_full_data = datetime.strptime(self.config.start_date, "%Y-%m-%d") + timedelta(
days=(self.config.num_dates_enrollment + self.config.analysis_length - 1)
)
if last_date_full_data.date() >= current_date:
raise errors.AnalysisDatesNotAvailableException(self.config.target_slug)
return TimeLimits.for_single_analysis_window(
self.config.start_date,
last_date_full_data.strftime("%Y-%m-%d"),
0,
self.config.analysis_length,
self.config.num_dates_enrollment,
)
def calculate_metrics(
self,
time_limits: TimeLimits,
ht: HistoricalTarget,
) -> Tuple[DataFrame, str]:
targets_sql = ht.build_targets_query(
time_limits=time_limits,
target_list=self.config.target_list,
)
targets_table_name = sanitize_table_name_for_bq(
"_".join(
[
"auto-sizing",
self.config.target_slug,
]
)
)
self.bigquerycontext.run_query(targets_sql, targets_table_name, replace_tables=True)
metrics_sql = ht.build_metrics_query(
time_limits=time_limits,
metric_list=self.config.metric_list,
targets_table=self.bigquerycontext.fully_qualify_table_name(targets_table_name),
)
metrics_table_name = sanitize_table_name_for_bq(
"_".join(
[
"metrics-table",
self.config.target_slug,
]
)
)
df = self.bigquerycontext.run_query(
metrics_sql, metrics_table_name, replace_tables=True
).to_dataframe()
delete_bq_table(
self.bigquerycontext.fully_qualify_table_name(targets_table_name), self.project
)
return df, metrics_table_name
def calculate_sample_sizes(
self, metrics_table: DataFrame, parameters: Dict[str, float]
) -> Dict[str, Any]:
res = z_or_t_ind_sample_size_calc(
df=metrics_table,
metrics_list=self.config.metric_list,
effect_size=parameters["effect_size"],
power=parameters["power"],
)
metrics_results = {
key: {
"number_of_clients_targeted": res[key]["number_of_clients_targeted"],
"sample_size_per_branch": res[key]["sample_size_per_branch"],
"population_percent_per_branch": res[key]["population_percent_per_branch"],
}
for key in res.keys()
}
result_dict = {
"parameters": parameters,
"metrics": metrics_results,
}
return result_dict
def publish_results(self, result_dict: Dict[str, Any], current_date: str) -> None:
if self.config.config_file and not self.bucket:
path = Path(self.config.config_file.name).parent / f"{self.config.target_slug}.json"
path.write_text(json.dumps(result_dict))
print(f"Results saved at {path}")
else:
export_sample_size_json(
self.project,
self.bucket,
self.config.target_slug,
json.dumps(result_dict),
current_date,
)
def run(self, current_date: datetime) -> None:
time_limits = self._validate_requested_timelimits(current_date)
ht = HistoricalTarget(
experiment_name=self.config.target_slug,
start_date=self.config.start_date,
analysis_length=self.config.analysis_length,
num_dates_enrollment=self.config.num_dates_enrollment,
)
metrics_table, metrics_table_name = self.calculate_metrics(time_limits=time_limits, ht=ht)
print(f"Metrics table saved at {metrics_table_name}")
results_combined = {}
if len(metrics_table) == 0:
print("No clients satisfied targeting.")
return
for parameters in self.config.parameters:
res = self.calculate_sample_sizes(metrics_table=metrics_table, parameters=parameters)
res["parameters"] = parameters
results_combined[
f"Power{str(parameters['power'])}EffectSize{str(parameters['effect_size'])}"
] = res
self.publish_results(results_combined, current_date.strftime("%Y-%m-%d"))