in components/dpu-workflow/src/docs_processing_orchestrator.py [0:0]
def generate_classify_job_params_fn(**context):
classifier_id = context["params"]["classifier"]
valid_tuple = is_valid_processor_id(classifier_id)
if not valid_tuple:
logging.warning(
f"Classifier processor id is not specified or not valid. Skipping. {classifier_id=}"
)
raise AirflowSkipException()
files_to_process = context["ti"].xcom_pull(
task_ids="initial_load_from_input_bucket.process_supported_types",
key="types_to_process",
)
if "pdf" not in files_to_process:
logging.warning("No PDF files to classify, skipping the classify step.")
raise AirflowSkipException()
process_folder = context["ti"].xcom_pull(
task_ids="initial_load_from_input_bucket.create_process_folder",
key="process_folder",
)
process_bucket = os.environ.get("DPU_PROCESS_BUCKET")
assert process_bucket is not None, "DPU_PROCESS_BUCKET is not set"
return cloud_run_utils.get_doc_classifier_job_overrides(
classifier_project_id=valid_tuple[0],
classifier_location=valid_tuple[1],
classifier_processor_id=valid_tuple[2],
process_folder=process_folder,
process_bucket=process_bucket,
)