in components/dpu-workflow/src/docs_processing_orchestrator.py [0:0]
def generate_process_job_params(**context):
mv_params = context["ti"].xcom_pull(
key="return_value",
task_ids="initial_load_from_input_bucket.generate_files_move_parameters",
)
if not mv_params:
logging.warning(
"No need to run, since generate_files_move_parameters "
"did not generate any files to process"
)
raise AirflowSkipException()
bq_table = context["ti"].xcom_pull(key="bigquery_table")
doc_processor_job_name = os.environ.get("DOC_PROCESSOR_JOB_NAME")
gcs_reject_bucket = os.environ.get("DPU_REJECT_BUCKET")
supported_files = {
x["file-suffix"]: x["processor"] for x in context["params"]["supported_files"]
}
process_job_params = cloud_run_utils.get_process_job_params(
bq_table, doc_processor_job_name, gcs_reject_bucket, mv_params, supported_files
)
return process_job_params