def insert()

in jobs/fxci-taskcluster-export/fxci_etl/loaders/bigquery.py [0:0]


    def insert(self, records: list[Record] | Record, use_backup=True):
        """Insert records into the table.

        Args:
            records (list[Record]): List of records to insert into the table.
        """
        if isinstance(records, Record):
            records = [records]

        if use_backup:
            try:
                # Load previously failed records from storage, maybe the issue is fixed.
                for obj in json.loads(self._record_backup.download_as_string()):
                    records.append(Record.from_dict(obj))
            except NotFound:
                pass

        logger.info(f"{len(records)} records to insert into table '{self.table_name}'")

        # There's a 10MB limit on the `insert_rows` request, submit rows in
        # batches to avoid exceeding it.
        errors = []
        for batch in batched(records, self.CHUNK_SIZE):
            logger.debug(f"Inserting batch of {len(batch)} records")
            errors.extend(
                self.client.insert_rows(
                    self.table, [asdict(row) for row in batch], retry=False
                )
            )

        if errors:
            logger.error("The following records failed:")
            pprint(errors)

        num_inserted = len(records) - len(errors)
        logger.info(
            f"Successfully inserted {num_inserted} records in table '{self.table_name}'"
        )

        if use_backup:
            self._record_backup.upload_from_string(json.dumps(errors))