def _get_failed_files_from_csv()

in src/translation/dags/batch_sql_translation.py [0:0]


def _get_failed_files_from_csv(ti) -> None:
    config = ti.xcom_pull(key="config", task_ids="create_translation_workflow")
    error_row_filter_rules = (
        config[ERROR_FILTER_KEY] if ERROR_FILTER_KEY in config else None
    )
    error_row_filter = csv_utils.rules_filter(error_row_filter_rules)

    source_path = config["migrationTask"]["translationConfigDetails"]["gcsSourcePath"]
    target_path = config["migrationTask"]["translationConfigDetails"]["gcsTargetPath"]
    target_bucket, target_folder = gcs_util.parse_bucket_and_blob_from_path(target_path)

    gcs_hook = GCSHook()
    report_object_name = storage_utils.append_blob_name_to_path(
        target_folder, REPORT_FILENAME
    )
    report = gcs_hook.download_as_byte_array(target_bucket, report_object_name).decode(
        "utf-8"
    )
    csv_reader = csv.reader(report.splitlines())

    failed_files = set()
    errors = []
    headers = next(csv_reader)
    logging.info("scanning failed files:")
    for row in csv_reader:
        row_dict = csv_utils.row_to_dict(headers, row)

        file_path = ""
        if row_dict["FilePath"] is not None and row_dict["FilePath"] != "":
            file_path = os.path.relpath(row_dict["FilePath"], start=source_path)
        elif row_dict["FileName"] is not None and row_dict["FileName"] != "":
            file_path = row_dict["FileName"]
        else:
            logging.info("File path and name is blank in CSV file")
            continue  # If name and path both are blank skip the iteration

        failed_file_path = f"{target_folder}/{file_path}"
        if not error_row_filter or not error_row_filter(row_dict):
            failed_files.add(failed_file_path)
            logging.info(f"{failed_file_path} contains errors")
            errors.append(stats_utils.csv_row_to_record(row_dict, None, False))
        else:
            logging.info(f"{failed_file_path} contains errors (filtered)")
            errors.append(stats_utils.csv_row_to_record(row_dict, None, True))

    ti.xcom_push(key="files", value=list(failed_files))
    ti.xcom_push(key="errors", value=errors)