in jobs/webcompat-kb/webcompat_kb/metric.py [0:0]
def update_metric_history(client: BigQuery, bq_dataset_id: str, write: bool) -> None:
for suffix in ["global_1000", "sightline", "all"]:
metrics_table_name = f"webcompat_topline_metric_{suffix}"
history_table_name = f"webcompat_topline_metric_{suffix}_history"
history_schema = [
bigquery.SchemaField("recorded_date", "DATE", mode="REQUIRED"),
bigquery.SchemaField("date", "DATE", mode="REQUIRED"),
bigquery.SchemaField("bug_count", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("needs_diagnosis_score", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("platform_score", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("not_supported_score", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("total_score", "NUMERIC", mode="REQUIRED"),
]
history_table = client.ensure_table(
history_table_name, history_schema, recreate=False
)
query = f"""
SELECT recorded_date
FROM `{bq_dataset_id}.{history_table_name}`
ORDER BY recorded_date DESC
LIMIT 1
"""
rows = list(client.query(query))
today = date.today()
if rows and rows[0]["recorded_date"] >= today:
# We've already recorded historic data today
logging.info(
f"Already recorded historic data in {history_table} today, skipping"
)
continue
query = f"""
SELECT *
FROM `{bq_dataset_id}.{metrics_table_name}`
"""
rows = [
{
"recorded_date": today,
"date": row.date,
"bug_count": row.bug_count,
"needs_diagnosis_score": row.needs_diagnosis_score,
"platform_score": row.platform_score,
"not_supported_score": row.not_supported_score,
"total_score": row.total_score,
}
for row in client.query(query)
]
client.insert_rows(history_table, rows)