def write_results_to_bigquery()

in components/specialized-parser/src/runner.py [0:0]


    def write_results_to_bigquery(self, bucket_name: str, csv_blob_name: str):
        # Construct the full table ID.
        table_ref = self.bq_client.get_dataset(
            f"{os.environ['PROCESSED_DOCS_BQ_PROJECT']}.{os.environ['PROCESSED_DOCS_BQ_DATASET']}"
        ).table(os.environ["PROCESSED_DOCS_BQ_TABLE"])

        # Configure the load job.
        job_config = bigquery.LoadJobConfig(
            write_disposition="WRITE_APPEND",  # Append data to the table
            source_format=bigquery.SourceFormat.CSV,
            autodetect=False,
            skip_leading_rows=1,
            schema=[
                bigquery.SchemaField("id", "STRING"),
                bigquery.SchemaField("original_filename", "STRING"),
                bigquery.SchemaField("results_file", "STRING"),
                bigquery.SchemaField("run_id", "STRING"),
                bigquery.SchemaField("entities", "JSON"),
            ],
        )

        # Construct the URI for the CSV file in GCS.
        uri = f"gs://{bucket_name}/{csv_blob_name}"

        # Create and run the load job.
        load_job = self.bq_client.load_table_from_uri(
            uri, table_ref, job_config=job_config
        )
        load_job.result()  # Wait for the job to complete