in components/dpu-workflow/src/docs_processing_orchestrator.py [0:0]
def generate_specialized_process_job_params(**context):
# get list of detected labels
detected_labels = context["ti"].xcom_pull(
task_ids="classify_pdfs.parse_doc_classifier_results_and_move_files",
key="return_value",
)
# possible processors configured in the run
possible_processors = {
x["label"]: x["doc-ai-processor-id"]
for x in context["params"]["doc-ai-processors"]
if x["label"] in detected_labels
}
# get the run_id
# logging.info(f"{context=}")
# run_id = context["dag_run"].run_id
process_folder = context["ti"].xcom_pull(
task_ids="initial_load_from_input_bucket.create_process_folder",
key="process_folder",
)
# Build BigQuery table id <project_id>.<dataset_id>.<table_id>
bq_table = context["ti"].xcom_pull(key="bigquery_table")
process_bucket = os.environ["DPU_PROCESS_BUCKET"]
job_name = os.environ.get("SPECIALIZED_PARSER_JOB_NAME", "specialized-parser")
specialized_parser_job_params_list = cloud_run_utils.specialized_parser_job_params(
possible_processors=possible_processors,
job_name=job_name,
run_id=process_folder,
bq_table=bq_table,
process_bucket=process_bucket,
process_folder=process_folder,
)
return specialized_parser_job_params_list