def generate_specialized_process_job_params()

in components/dpu-workflow/src/docs_processing_orchestrator.py [0:0]


def generate_specialized_process_job_params(**context):
    # get list of detected labels
    detected_labels = context["ti"].xcom_pull(
        task_ids="classify_pdfs.parse_doc_classifier_results_and_move_files",
        key="return_value",
    )
    # possible processors configured in the run
    possible_processors = {
        x["label"]: x["doc-ai-processor-id"]
        for x in context["params"]["doc-ai-processors"]
        if x["label"] in detected_labels
    }

    # get the run_id
    # logging.info(f"{context=}")
    # run_id = context["dag_run"].run_id
    process_folder = context["ti"].xcom_pull(
        task_ids="initial_load_from_input_bucket.create_process_folder",
        key="process_folder",
    )

    # Build BigQuery table id <project_id>.<dataset_id>.<table_id>
    bq_table = context["ti"].xcom_pull(key="bigquery_table")

    process_bucket = os.environ["DPU_PROCESS_BUCKET"]
    job_name = os.environ.get("SPECIALIZED_PARSER_JOB_NAME", "specialized-parser")
    specialized_parser_job_params_list = cloud_run_utils.specialized_parser_job_params(
        possible_processors=possible_processors,
        job_name=job_name,
        run_id=process_folder,
        bq_table=bq_table,
        process_bucket=process_bucket,
        process_folder=process_folder,
    )
    return specialized_parser_job_params_list