def _process_transfer_logs()

in src/datamigration/dags/teradata/teradata_transfer_run_log_dag.py [0:0]


def _process_transfer_logs(ti) -> None:
    """
    Processes DTS logs and extracts relevant information for tables' migration
    """
    transfer_config_id = ti.xcom_pull(
        key="transfer_config_id", task_ids="load_parameters"
    )
    transfer_run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
    unique_id = ti.xcom_pull(key="unique_id", task_ids="load_parameters")
    dts_run_summary_json = ti.xcom_pull(
        key="dts_run_summary_json", task_ids="get_transfer_run_summary"
    )
    job_stats_json_template = ti.xcom_pull(
        key="job_stats_json", task_ids="load_parameters"
    )
    dts_run_logs = ti.xcom_pull(key="dts_run_logs", task_ids="get_transfer_run_logs")

    job_stats_json_template["unique_id"] = unique_id
    job_stats_json_template["transfer_run_id"] = transfer_run_id
    job_stats_json_template["transfer_config_id"] = transfer_config_id

    job_stats_jsons = {}

    for log_dict in dts_run_logs:
        log_message = log_dict["messageText"]
        try:
            # Parsing error messages
            if log_dict["severity"] == "ERROR":
                job_stats_json_template["job_status"] = "FAILED"
                if log_message.__contains__("Agent"):
                    agent_msg_match = re.match(
                        r"Agent (.*). .* \((.*)\).*", log_message
                    )
                    table_name = agent_msg_match.group(2)
                    if table_name not in job_stats_jsons:
                        job_stats_jsons[table_name] = job_stats_json_template.copy()
                    job_stats_jsons[table_name]["src_table_name"] = table_name
                    job_stats_jsons[table_name]["agent_id"] = agent_msg_match.group(1)
                    job_stats_jsons[table_name]["message"] = (
                        job_stats_jsons[table_name]["message"] + " " + log_message
                    )
                elif log_message.__contains__("Job"):
                    job_msg_match = re.match(r"Job (.*) \(table (.*?)\).*", log_message)
                    if job_msg_match:
                        table_name = job_msg_match.group(2)
                        if table_name not in job_stats_jsons:
                            job_stats_jsons[table_name] = job_stats_json_template.copy()
                        job_stats_jsons[table_name]["agent_id"] = job_msg_match.group(1)
                        job_stats_jsons[table_name]["message"] = (
                            job_stats_jsons[table_name]["message"] + " " + log_message
                        )
                continue

            elif log_dict["severity"] == "INFO":
                job_stats_json_template["job_status"] = "SUCCEEDED"
                if log_message.__contains__("Transfer load"):
                    run_date_match = re.match("Transfer.* ([0-9]{8})", log_message)
                    run_date = run_date_match.group(1)
                    dts_run_summary_json["run_date"] = run_date
                    continue

                elif log_message.__contains__("Finished extracting data"):
                    extraction_complete_match = re.match(
                        r"^Agent (.*)\. EXTRACT \((.*)\)\:.*\: (.*)$", log_message
                    )
                    if extraction_complete_match:
                        table_name = extraction_complete_match.group(2).split(".")[1]
                        if table_name not in job_stats_jsons:
                            job_stats_jsons[table_name] = job_stats_json_template.copy()
                        job_stats_jsons[table_name]["src_table_name"] = table_name
                        job_stats_jsons[table_name][
                            "agent_id"
                        ] = extraction_complete_match.group(1)
                        job_stats_jsons[table_name][
                            "extract_duration"
                        ] = extraction_complete_match.group(3)
                        dts_run_summary_json[
                            "agent_id"
                        ] = extraction_complete_match.group(1)
                    continue

                elif log_message.__contains__("Uploading"):
                    file_path_match = re.match(
                        r"^Agent .* EXTRACT \((.*)\): .* (gs:\/\/.*).$", log_message
                    )
                    if file_path_match:
                        table_name = file_path_match.group(1).split(".")[1]
                        if table_name not in job_stats_jsons:
                            job_stats_jsons[table_name] = job_stats_json_template.copy()
                        job_stats_jsons[table_name]["src_table_name"] = table_name
                        job_stats_jsons[table_name][
                            "gcs_file_path"
                        ] = file_path_match.group(2)
                    continue

                elif log_message.__contains__(": Extracting data"):
                    extract_stat_match = re.match(
                        r"^Agent .* EXTRACT \((.*)\)\: (.*)\:.*\(approximate (.*)\).*\[(.*)\] into (\d*).* (\d*) session.*$",
                        log_message,
                    )
                    if extract_stat_match:
                        table_name = extract_stat_match.group(1).split(".")[1]
                        if table_name not in job_stats_jsons:
                            job_stats_jsons[table_name] = job_stats_json_template.copy()
                        job_stats_jsons[table_name]["src_table_name"] = table_name
                        job_stats_jsons[table_name][
                            "extract_data_size"
                        ] = extract_stat_match.group(3)
                        job_stats_jsons[table_name]["extract_partitions"] = (
                            "[" + extract_stat_match.group(4) + "]"
                        )
                        job_stats_jsons[table_name][
                            "extract_files"
                        ] = extract_stat_match.group(5)
                        job_stats_jsons[table_name][
                            "extract_sessions"
                        ] = extract_stat_match.group(6)
                    continue

                elif log_message.__contains__(": Running"):
                    transfer_mode_match = re.match(
                        r"^Agent .* EXTRACT \((.*)\): Running in (.*) data transfer .*$",
                        log_message,
                    )
                    if transfer_mode_match:
                        table_name = transfer_mode_match.group(1).split(".")[1]
                        if table_name not in job_stats_jsons:
                            job_stats_jsons[table_name] = job_stats_json_template.copy()
                        job_stats_jsons[table_name]["src_table_name"] = table_name
                        job_stats_jsons[table_name][
                            "transfer_mode"
                        ] = transfer_mode_match.group(2)
                    continue

                elif log_message.__contains__("Number of records"):
                    job_summary_match = re.match(
                        r"Job (.*) \(table (.*)\) .* records: (\d*),.* (\d*).",
                        log_message,
                    )
                    if job_summary_match:
                        table_name = job_summary_match.group(2)
                        if table_name not in job_stats_jsons:
                            job_stats_jsons[table_name] = job_stats_json_template.copy()
                        job_stats_jsons[table_name]["src_table_name"] = table_name
                        job_stats_jsons[table_name][
                            "bq_job_id"
                        ] = job_summary_match.group(1)
                        job_stats_jsons[table_name][
                            "success_records"
                        ] = job_summary_match.group(3)
                        job_stats_jsons[table_name][
                            "error_records"
                        ] = job_summary_match.group(4)
                    continue

                elif log_message.__contains__("Summary:"):
                    run_summary_match = re.match(
                        r"^Summary: succeeded (\d*).*failed (\d*).*", log_message
                    )
                    dts_run_summary_json["succeeded_jobs"] = run_summary_match.group(1)
                    dts_run_summary_json["failed_jobs"] = run_summary_match.group(2)
                    continue

        except Exception as e:
            logging.error(
                f"Method _process_transfer_logs: failed to extract value from log \
                \n\tlog line: {log_message}\n\terror: {e}"
            )

    job_stats_json_rows = []
    for row_dict in job_stats_jsons.values():
        row_dict["run_date"] = run_date

        if not row_dict["message"]:
            row_dict["transfer_run_state"] = "SUCCEEDED"
        else:
            if row_dict["message"].__contains__("Skipping"):
                row_dict["transfer_run_state"] = "SKIPPED"
            else:
                row_dict["transfer_run_state"] = "FAILED"

        if row_dict["src_table_name"]:
            job_stats_json_rows.append(row_dict)

    logging.info(
        f"Method _process_transfer_logs: extracted {len(job_stats_json_rows)} transfer run job rows from logs"
    )

    ti.xcom_push(key="dts_run_summary_json", value=dts_run_summary_json)
    ti.xcom_push(key="job_stats_json_rows", value=job_stats_json_rows)