in src/translation/dags/batch_sql_translation.py [0:0]
def _get_failed_files_from_csv(ti) -> None:
config = ti.xcom_pull(key="config", task_ids="create_translation_workflow")
error_row_filter_rules = (
config[ERROR_FILTER_KEY] if ERROR_FILTER_KEY in config else None
)
error_row_filter = csv_utils.rules_filter(error_row_filter_rules)
source_path = config["migrationTask"]["translationConfigDetails"]["gcsSourcePath"]
target_path = config["migrationTask"]["translationConfigDetails"]["gcsTargetPath"]
target_bucket, target_folder = gcs_util.parse_bucket_and_blob_from_path(target_path)
gcs_hook = GCSHook()
report_object_name = storage_utils.append_blob_name_to_path(
target_folder, REPORT_FILENAME
)
report = gcs_hook.download_as_byte_array(target_bucket, report_object_name).decode(
"utf-8"
)
csv_reader = csv.reader(report.splitlines())
failed_files = set()
errors = []
headers = next(csv_reader)
logging.info("scanning failed files:")
for row in csv_reader:
row_dict = csv_utils.row_to_dict(headers, row)
file_path = ""
if row_dict["FilePath"] is not None and row_dict["FilePath"] != "":
file_path = os.path.relpath(row_dict["FilePath"], start=source_path)
elif row_dict["FileName"] is not None and row_dict["FileName"] != "":
file_path = row_dict["FileName"]
else:
logging.info("File path and name is blank in CSV file")
continue # If name and path both are blank skip the iteration
failed_file_path = f"{target_folder}/{file_path}"
if not error_row_filter or not error_row_filter(row_dict):
failed_files.add(failed_file_path)
logging.info(f"{failed_file_path} contains errors")
errors.append(stats_utils.csv_row_to_record(row_dict, None, False))
else:
logging.info(f"{failed_file_path} contains errors (filtered)")
errors.append(stats_utils.csv_row_to_record(row_dict, None, True))
ti.xcom_push(key="files", value=list(failed_files))
ti.xcom_push(key="errors", value=errors)