def _download_files()

in src/translation/dags/batch_sql_translation.py [0:0]


def _download_files(ti):
    config = ti.xcom_pull(key="config", task_ids="create_translation_workflow")
    file_list = ti.xcom_pull(
        key="file_names", task_ids="get_successfully_translated_files"
    )

    translation_target_path = config["migrationTask"]["translationConfigDetails"][
        "gcsTargetPath"
    ]
    bucket, folder = gcs_util.parse_bucket_and_blob_from_path(translation_target_path)

    logging.info(file_list)
    if len(file_list) == 0:
        logging.info("SQL File list is empty and there are no SQL files to download")
    else:
        gcs_hook = GCSHook()
        for filename in file_list:
            local_filename = f"/home/airflow/gcs/data/{filename}"
            object_name = f"{folder}/{filename}"
            # make sure local folder exists before downloading
            local_folder, _ = os.path.split(local_filename)
            if not os.path.exists(local_folder):
                os.makedirs(local_folder)
            # download file
            gcs_hook.download(
                bucket_name=bucket,
                object_name=object_name,
                filename=local_filename,
            )
        logging.info(os.listdir("/home/airflow/gcs/data"))