in components/specialized-parser/src/runner.py [0:0]
def write_results_to_bigquery(self, bucket_name: str, csv_blob_name: str):
# Construct the full table ID.
table_ref = self.bq_client.get_dataset(
f"{os.environ['PROCESSED_DOCS_BQ_PROJECT']}.{os.environ['PROCESSED_DOCS_BQ_DATASET']}"
).table(os.environ["PROCESSED_DOCS_BQ_TABLE"])
# Configure the load job.
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_APPEND", # Append data to the table
source_format=bigquery.SourceFormat.CSV,
autodetect=False,
skip_leading_rows=1,
schema=[
bigquery.SchemaField("id", "STRING"),
bigquery.SchemaField("original_filename", "STRING"),
bigquery.SchemaField("results_file", "STRING"),
bigquery.SchemaField("run_id", "STRING"),
bigquery.SchemaField("entities", "JSON"),
],
)
# Construct the URI for the CSV file in GCS.
uri = f"gs://{bucket_name}/{csv_blob_name}"
# Create and run the load job.
load_job = self.bq_client.load_table_from_uri(
uri, table_ref, job_config=job_config
)
load_job.result() # Wait for the job to complete