in jobs/webcompat-kb/webcompat_kb/utils.py [0:0]
def backfill_history() -> None:
logging.basicConfig()
parser = get_parser_backfill_history()
args = parser.parse_args()
logging.getLogger().setLevel(logging.getLevelNamesMapping()[args.log_level.upper()])
src_dataset = args.bq_kb_src_dataset
dest_dataset = args.bq_kb_dest_dataset
client = BigQuery(get_client(args.bq_project_id), dest_dataset, args.write)
existing_records_dest: dict[HistoryKey, list[dict[str, str]]] = {}
existing_records_src: dict[HistoryKey, list[dict[str, str]]] = {}
for dataset, records in [
(dest_dataset, existing_records_dest),
(src_dataset, existing_records_src),
]:
for row in client.query("""SELECT * FROM bugs_history""", dataset_id=dataset):
key = HistoryKey(row.number, row.who, row.change_time)
if key in records:
logging.warning(
f"Got duplicate src data for {key}: {row.changes}, {records[key]}"
)
for change in row.changes:
if change not in records[key]:
records[key].append(change)
else:
records[key] = row.changes
logging.info(
f"Started with {len(existing_records_src)} records in {src_dataset} and {len(existing_records_dest)} in {dest_dataset}"
)
new_records: list[tuple[datetime, Mapping[str, Json]]] = []
new_count = 0
updated_count = 0
unchanged_count = 0
for key, changes in existing_records_src.items():
if key in existing_records_dest:
existing = [
normalize_change(change) for change in existing_records_dest[key]
]
new = [normalize_change(change) for change in changes]
if new == existing or (
all(item in existing for item in new)
and all(item in new for item in existing)
):
unchanged_count += 1
else:
missing = [item for item in existing if item not in new]
if missing:
logging.warning(
f"Updating record {key}, merging {new} with {existing}"
)
changes.extend(missing)
updated_count += 1
else:
new_count += 1
new_records.append(
(
key.change_time,
{
"number": key.number,
"who": key.who,
"change_time": key.change_time.isoformat(),
"changes": changes,
},
)
)
for key, changes in existing_records_dest.items():
if key not in existing_records_src:
unchanged_count += 1
new_records.append(
(
key.change_time,
{
"number": key.number,
"who": key.who,
"change_time": key.change_time.isoformat(),
"changes": changes,
},
)
)
logging.info(
f"Writing {len(new_records)} records to {dest_dataset}, {unchanged_count} unchanged, {updated_count} updated, {new_count} new"
)
new_records.sort()
schema = [
bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("who", "STRING", mode="REQUIRED"),
bigquery.SchemaField("change_time", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField(
"changes",
"RECORD",
mode="REPEATED",
fields=[
bigquery.SchemaField("field_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("added", "STRING", mode="REQUIRED"),
bigquery.SchemaField("removed", "STRING", mode="REQUIRED"),
],
),
]
client.write_table(
"bugs_history", schema, [item[1] for item in new_records], overwrite=True
)