in src/translation/dags/batch_sql_translation.py [0:0]
def _get_successfully_translated_files(ti) -> None:
[all_files, failed_files] = ti.xcom_pull(
key="files",
task_ids=["get_all_translated_files", "get_failed_files_from_csv"],
)
config = ti.xcom_pull(key="config", task_ids="create_translation_workflow")
target_path = config["migrationTask"]["translationConfigDetails"]["gcsTargetPath"]
_, target_folder = gcs_util.parse_bucket_and_blob_from_path(target_path)
successfully_translated_files = set(all_files).difference(set(failed_files))
if len(successfully_translated_files):
ti.xcom_push(key="files", value=list(successfully_translated_files))
file_names = []
for filepath in successfully_translated_files:
relative_path = os.path.relpath(filepath, start=target_folder)
folder, _ = os.path.split(relative_path)
if is_ddl_run(config) or folder != DDL_FOLDER:
file_names.append(relative_path)
ti.xcom_push(key="file_names", value=file_names)
else:
raise Exception("No files were successfully translated.")