def move_duplicated_files_to_rejected_bucket_fn()

in components/dpu-workflow/src/docs_processing_orchestrator.py [0:0]


def move_duplicated_files_to_rejected_bucket_fn(**context):
    output_folder = context["ti"].xcom_pull(
        task_ids="initial_load_from_input_bucket.generate_check_duplicated_files_job_params",
        key="output_folder",
    )
    process_folder = context["ti"].xcom_pull(
        task_ids="initial_load_from_input_bucket.create_process_folder",
        key="process_folder",
    )
    process_files_by_type = context["ti"].xcom_pull(
        task_ids="initial_load_from_input_bucket.process_supported_types",
        key="types_to_process",
    )
    gcs_utils.move_duplicated_files(
        f"{output_folder}/result.jsonl",
        f'{os.environ.get("DPU_REJECT_BUCKET")}/{process_folder}',
        process_files_by_type,
    )
    for key in list(process_files_by_type):
        if not process_files_by_type[key]:
            del process_files_by_type[key]
    return process_files_by_type