in src/translation/dags/batch_sql_translation.py [0:0]
def _download_files(ti):
config = ti.xcom_pull(key="config", task_ids="create_translation_workflow")
file_list = ti.xcom_pull(
key="file_names", task_ids="get_successfully_translated_files"
)
translation_target_path = config["migrationTask"]["translationConfigDetails"][
"gcsTargetPath"
]
bucket, folder = gcs_util.parse_bucket_and_blob_from_path(translation_target_path)
logging.info(file_list)
if len(file_list) == 0:
logging.info("SQL File list is empty and there are no SQL files to download")
else:
gcs_hook = GCSHook()
for filename in file_list:
local_filename = f"/home/airflow/gcs/data/{filename}"
object_name = f"{folder}/{filename}"
# make sure local folder exists before downloading
local_folder, _ = os.path.split(local_filename)
if not os.path.exists(local_folder):
os.makedirs(local_folder)
# download file
gcs_hook.download(
bucket_name=bucket,
object_name=object_name,
filename=local_filename,
)
logging.info(os.listdir("/home/airflow/gcs/data"))