def generate_process_job_params()

in components/dpu-workflow/src/docs_processing_orchestrator.py [0:0]


def generate_process_job_params(**context):
    mv_params = context["ti"].xcom_pull(
        key="return_value",
        task_ids="initial_load_from_input_bucket.generate_files_move_parameters",
    )
    if not mv_params:
        logging.warning(
            "No need to run, since generate_files_move_parameters "
            "did not generate any files to process"
        )
        raise AirflowSkipException()
    bq_table = context["ti"].xcom_pull(key="bigquery_table")
    doc_processor_job_name = os.environ.get("DOC_PROCESSOR_JOB_NAME")
    gcs_reject_bucket = os.environ.get("DPU_REJECT_BUCKET")
    supported_files = {
        x["file-suffix"]: x["processor"] for x in context["params"]["supported_files"]
    }
    process_job_params = cloud_run_utils.get_process_job_params(
        bq_table, doc_processor_job_name, gcs_reject_bucket, mv_params, supported_files
    )
    return process_job_params