in document-ai/code/main.py [0:0]
def process_output(bucket_name, object_name, document_text, summary_text, document_dict):
"""Moves a blob from one bucket to another."""
print("Process output started.")
storage_client = storage.Client()
destination_bucket_name = os.environ['GCS_OUTPUT']
destination_bucket = storage_client.bucket(destination_bucket_name)
# Save results
print("Saving raw results into the output bucket...")
results_text_name = "{}.text".format(object_name)
results_text_blob = destination_bucket.blob(results_text_name)
results_text_blob.upload_from_string(document_text)
print("Saving summary results into the output bucket...")
results_summary_name = "{}.summary".format(object_name)
results_summary_blob = destination_bucket.blob(results_summary_name)
results_summary_blob.upload_from_string(summary_text)
print("Saving json results into the output bucket...")
results_json = {
"document_file_name": object_name,
"document_content": document_dict,
"document_summary": summary_text
}
results_json = json.dumps(results_json)
results_json_name = "{}.json".format(object_name)
results_json_blob = destination_bucket.blob(results_json_name)
results_json_blob.upload_from_string(results_json)
# Move object from input to output bucket
print("Moving object {} from {} to {}".format(object_name, bucket_name, destination_bucket_name))
source_bucket = storage_client.bucket(bucket_name)
source_blob = source_bucket.blob(object_name)
blob_copy = source_bucket.copy_blob(source_blob, destination_bucket, object_name)
source_bucket.delete_blob(object_name)
# Persist results into BigQuery
print("Persisting data to BigQuery...")
bq_client = bigquery.Client()
table_id = os.getenv("BQ_TABLE_ID")
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("document_file_name", "STRING"),
bigquery.SchemaField("document_content", "JSON"),
bigquery.SchemaField("document_summary", "STRING"),
],
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)
uri = "gs://{}/{}".format(destination_bucket_name, results_json_name)
print("Load file {} into BigQuery".format(uri))
load_job = bq_client.load_table_from_uri(
uri,
table_id,
location=os.getenv("BQ_LOCATION"), # Must match the destination dataset location.
job_config=job_config,
)
load_job.result()
print("Process output completed.")