in jetstream/analysis.py [0:0]
def ensure_enrollments(self, current_date: datetime) -> None:
"""Ensure that enrollment tables for experiment are up-to-date or re-create."""
time_limits = self._get_timelimits_if_ready(AnalysisPeriod.DAY, current_date)
if time_limits is None:
logger.info(
"Skipping enrollments for %s; not ready", self.config.experiment.normandy_slug
)
return
if self.config.experiment.start_date is None:
raise errors.NoStartDateException(self.config.experiment.normandy_slug)
normalized_slug = bq_normalize_name(self.config.experiment.normandy_slug)
enrollments_table = f"enrollments_{normalized_slug}"
logger.info(f"Create {enrollments_table}")
enrollments_sql = self.enrollments_query(time_limits=time_limits)
try:
self._write_sql_output(enrollments_table, enrollments_sql)
results = self.bigquery.execute(
enrollments_sql,
enrollments_table,
google.cloud.bigquery.job.WriteDisposition.WRITE_EMPTY,
experiment_slug=self.config.experiment.normandy_slug,
)
logger.info(
"Enrollment query cost: " + f"{results.slot_millis * COST_PER_SLOT_MS}",
)
except Conflict:
pass