in components/specialized-parser/src/runner.py [0:0]
def run(self):
logger.info("Verifying AlloyDB output table")
self.verify_alloydb_table()
logger.info("Starting Batch Processor operation")
batch_operation = self.call_batch_processor()
logger.info("Waiting for Batch operation to finish")
individual_process_statuses = self.wait_for_completion_and_verify_success(
batch_operation
)
logger.info(f"Parsing results from {self.job_config.gcs_output_uri}")
parsed_results, filename_pairs = self.read_and_parse_batch_results(
individual_process_statuses,
)
logger.info("Writing metadata to bigquery")
self.write_metadata_to_bigquery(filename_pairs)
if not parsed_results:
logger.info("No parsed results from processor - only metadata")
else:
logger.info("Writing results to GCS")
bucket_name, csv_blob_name = self.write_results_to_gcs(parsed_results)
logger.info("Writing results to AlloyDB")
self.write_results_to_alloydb_with_inserts(parsed_results)
logger.info("Writing results to BigQuery")
self.write_results_to_bigquery(bucket_name, csv_blob_name)
self.alloydb_connection_pool.dispose()
logger.info("Done")