in components/dpu-workflow/src/docs_processing_orchestrator.py [0:0]
def move_duplicated_files_to_rejected_bucket_fn(**context):
output_folder = context["ti"].xcom_pull(
task_ids="initial_load_from_input_bucket.generate_check_duplicated_files_job_params",
key="output_folder",
)
process_folder = context["ti"].xcom_pull(
task_ids="initial_load_from_input_bucket.create_process_folder",
key="process_folder",
)
process_files_by_type = context["ti"].xcom_pull(
task_ids="initial_load_from_input_bucket.process_supported_types",
key="types_to_process",
)
gcs_utils.move_duplicated_files(
f"{output_folder}/result.jsonl",
f'{os.environ.get("DPU_REJECT_BUCKET")}/{process_folder}',
process_files_by_type,
)
for key in list(process_files_by_type):
if not process_files_by_type[key]:
del process_files_by_type[key]
return process_files_by_type