in jobs/fxci-taskcluster-export/fxci_etl/loaders/bigquery.py [0:0]
def insert(self, records: list[Record] | Record, use_backup=True):
"""Insert records into the table.
Args:
records (list[Record]): List of records to insert into the table.
"""
if isinstance(records, Record):
records = [records]
if use_backup:
try:
# Load previously failed records from storage, maybe the issue is fixed.
for obj in json.loads(self._record_backup.download_as_string()):
records.append(Record.from_dict(obj))
except NotFound:
pass
logger.info(f"{len(records)} records to insert into table '{self.table_name}'")
# There's a 10MB limit on the `insert_rows` request, submit rows in
# batches to avoid exceeding it.
errors = []
for batch in batched(records, self.CHUNK_SIZE):
logger.debug(f"Inserting batch of {len(batch)} records")
errors.extend(
self.client.insert_rows(
self.table, [asdict(row) for row in batch], retry=False
)
)
if errors:
logger.error("The following records failed:")
pprint(errors)
num_inserted = len(records) - len(errors)
logger.info(
f"Successfully inserted {num_inserted} records in table '{self.table_name}'"
)
if use_backup:
self._record_backup.upload_from_string(json.dumps(errors))