def _process_transfer_logs()

in src/datamigration/dags/redshift/redshift_transfer_run_log_dag.py [0:0]


def _process_transfer_logs(ti) -> None:
    """
    Processes DTS logs and extracts relevant information for tables' migration
    """
    transfer_config_id = ti.xcom_pull(
        key="transfer_config_id", task_ids="load_parameters"
    )
    transfer_run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
    unique_id = ti.xcom_pull(key="unique_id", task_ids="load_parameters")
    dts_run_summary_json = ti.xcom_pull(
        key="dts_run_summary_json", task_ids="get_transfer_run_summary"
    )
    job_stats_json_template = ti.xcom_pull(
        key="job_stats_json", task_ids="load_parameters"
    )
    dts_run_logs = ti.xcom_pull(key="dts_run_logs", task_ids="get_transfer_run_logs")

    job_stats_json_template["unique_id"] = unique_id
    job_stats_json_template["transfer_run_id"] = transfer_run_id
    job_stats_json_template["transfer_config_id"] = transfer_config_id

    job_stats_jsons = {}

    # TODO: add parsing logic for empty table migration
    # TODO: shorten error messages before inserting to bigquery
    for log_dict in dts_run_logs:
        log_message = log_dict["messageText"]
        try:
            # Parsing error messages
            if log_dict["severity"] == "ERROR":
                job_stats_json_template["job_status"] = "FAILED"
                if log_message.__contains__("Job"):
                    job_msg_match = re.match(r"Job (.*) \(table (.*?)\).*", log_message)
                    if job_msg_match:
                        table_name = job_msg_match.group(2)
                        if table_name not in job_stats_jsons:
                            job_stats_jsons[table_name] = job_stats_json_template.copy()
                        job_stats_jsons[table_name]["src_table_name"] = table_name
                        job_stats_jsons[table_name]["bq_job_id"] = job_msg_match.group(
                            1
                        )
                        job_stats_jsons[table_name]["message"] = (
                            job_stats_jsons[table_name]["message"] + " " + log_message
                        )
                else:
                    if not dts_run_summary_json.get("error_message"):
                        dts_run_summary_json["error_message"] = log_message
                continue

            elif log_dict["severity"] == "INFO":
                job_stats_json_template["job_status"] = "SUCCEEDED"

                if log_message.__contains__("Transfer load"):
                    run_date_match = re.match("Transfer.* ([0-9]{8})", log_message)
                    run_date = run_date_match.group(1)
                    dts_run_summary_json["run_date"] = run_date
                    continue

                # Done only to map source table_name with target table name
                # elif log_message.__contains__("data will be appended to existing table"):
                #     # For target table name
                #     target_summary_match = re.match(
                #         r"Table (.*) already exists in (.*);",
                #         log_message,
                #     )
                #     target_dataset_name = target_summary_match.group(2).split(".")[1]
                #     target_table_name = (
                #         target_dataset_name + "." + target_summary_match.group(1)
                #     )
                #     logging.info(target_table_name)
                #     continue

                # Done
                elif log_message.__contains__("Number of records"):
                    job_summary_match = re.match(
                        r"Job (.*) \(table (.*)\) .* records: (\d*),.* (\d*).",
                        log_message,
                    )
                    if job_summary_match:
                        table_name = job_summary_match.group(2)
                        if table_name not in job_stats_jsons:
                            job_stats_jsons[table_name] = job_stats_json_template.copy()
                        job_stats_jsons[table_name]["src_table_name"] = table_name
                        job_stats_jsons[table_name][
                            "bq_job_id"
                        ] = job_summary_match.group(1)
                        job_stats_jsons[table_name][
                            "success_records"
                        ] = job_summary_match.group(3)
                        job_stats_jsons[table_name][
                            "error_records"
                        ] = job_summary_match.group(4)
                    continue
                # Done
                elif log_message.__contains__("Summary:"):
                    run_summary_match = re.match(
                        r"^Summary: succeeded (\d*).*failed (\d*).*", log_message
                    )
                    dts_run_summary_json["succeeded_jobs"] = run_summary_match.group(1)
                    dts_run_summary_json["failed_jobs"] = run_summary_match.group(2)
                    continue

        except Exception as e:
            logging.error(
                f"Method _process_transfer_logs: failed to extract value from log \
                \n\tlog line: {log_message}\n\terror: {e}"
            )

    job_stats_json_rows = []
    for row_dict in job_stats_jsons.values():
        row_dict["run_date"] = run_date

        if not row_dict["message"]:
            row_dict["transfer_run_state"] = "SUCCEEDED"
        else:
            if row_dict["message"].__contains__("Skipping"):
                row_dict["transfer_run_state"] = "SKIPPED"
            else:
                row_dict["transfer_run_state"] = "FAILED"

        if row_dict["src_table_name"]:
            job_stats_json_rows.append(row_dict)

    logging.info(
        f"Method _process_transfer_logs: extracted {len(job_stats_json_rows)} transfer run job rows from logs"
    )

    ti.xcom_push(key="dts_run_summary_json", value=dts_run_summary_json)
    ti.xcom_push(key="job_stats_json_rows", value=job_stats_json_rows)