in opmon/monitoring.py [0:0]
def _run_metrics_sql(self, submission_date: datetime):
"""Generate and execute the ETL for a specific data type."""
try:
self._check_runnable(submission_date)
except Exception as e:
print(f"Failed to run opmon project: {e}")
return
table_name = f"{self.normalized_slug}_v{SCHEMA_VERSIONS['metric']}"
join_keys = copy.deepcopy(METRICS_JOIN_KEYS)
for dimension in self.config.dimensions:
join_keys.append(dimension.name)
self.bigquery.execute(
self._get_metrics_sql(submission_date=submission_date, table_name=table_name),
destination_table=f"{table_name}${submission_date:%Y%m%d}",
clustering=["build_id"],
time_partitioning="submission_date",
partition_expiration_ms=TABLE_EXPIRATION_MS,
write_disposition=bigquery.job.WriteDisposition.WRITE_TRUNCATE,
dataset=self.derived_dataset,
join_keys=join_keys,
annotations={
"slug": self.slug,
"type": "metrics_query",
"submission_date": submission_date,
},
)