def generate_classify_job_params_fn()

in components/dpu-workflow/src/docs_processing_orchestrator.py [0:0]


def generate_classify_job_params_fn(**context):
    classifier_id = context["params"]["classifier"]
    valid_tuple = is_valid_processor_id(classifier_id)
    if not valid_tuple:
        logging.warning(
            f"Classifier processor id is not specified or not valid. Skipping. {classifier_id=}"
        )
        raise AirflowSkipException()
    files_to_process = context["ti"].xcom_pull(
        task_ids="initial_load_from_input_bucket.process_supported_types",
        key="types_to_process",
    )
    if "pdf" not in files_to_process:
        logging.warning("No PDF files to classify, skipping the classify step.")
        raise AirflowSkipException()

    process_folder = context["ti"].xcom_pull(
        task_ids="initial_load_from_input_bucket.create_process_folder",
        key="process_folder",
    )
    process_bucket = os.environ.get("DPU_PROCESS_BUCKET")
    assert process_bucket is not None, "DPU_PROCESS_BUCKET is not set"

    return cloud_run_utils.get_doc_classifier_job_overrides(
        classifier_project_id=valid_tuple[0],
        classifier_location=valid_tuple[1],
        classifier_processor_id=valid_tuple[2],
        process_folder=process_folder,
        process_bucket=process_bucket,
    )