in src/datamigration/dags/teradata/teradata_transfer_run_log_dag.py [0:0]
def _process_transfer_logs(ti) -> None:
"""
Processes DTS logs and extracts relevant information for tables' migration
"""
transfer_config_id = ti.xcom_pull(
key="transfer_config_id", task_ids="load_parameters"
)
transfer_run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
unique_id = ti.xcom_pull(key="unique_id", task_ids="load_parameters")
dts_run_summary_json = ti.xcom_pull(
key="dts_run_summary_json", task_ids="get_transfer_run_summary"
)
job_stats_json_template = ti.xcom_pull(
key="job_stats_json", task_ids="load_parameters"
)
dts_run_logs = ti.xcom_pull(key="dts_run_logs", task_ids="get_transfer_run_logs")
job_stats_json_template["unique_id"] = unique_id
job_stats_json_template["transfer_run_id"] = transfer_run_id
job_stats_json_template["transfer_config_id"] = transfer_config_id
job_stats_jsons = {}
for log_dict in dts_run_logs:
log_message = log_dict["messageText"]
try:
# Parsing error messages
if log_dict["severity"] == "ERROR":
job_stats_json_template["job_status"] = "FAILED"
if log_message.__contains__("Agent"):
agent_msg_match = re.match(
r"Agent (.*). .* \((.*)\).*", log_message
)
table_name = agent_msg_match.group(2)
if table_name not in job_stats_jsons:
job_stats_jsons[table_name] = job_stats_json_template.copy()
job_stats_jsons[table_name]["src_table_name"] = table_name
job_stats_jsons[table_name]["agent_id"] = agent_msg_match.group(1)
job_stats_jsons[table_name]["message"] = (
job_stats_jsons[table_name]["message"] + " " + log_message
)
elif log_message.__contains__("Job"):
job_msg_match = re.match(r"Job (.*) \(table (.*?)\).*", log_message)
if job_msg_match:
table_name = job_msg_match.group(2)
if table_name not in job_stats_jsons:
job_stats_jsons[table_name] = job_stats_json_template.copy()
job_stats_jsons[table_name]["agent_id"] = job_msg_match.group(1)
job_stats_jsons[table_name]["message"] = (
job_stats_jsons[table_name]["message"] + " " + log_message
)
continue
elif log_dict["severity"] == "INFO":
job_stats_json_template["job_status"] = "SUCCEEDED"
if log_message.__contains__("Transfer load"):
run_date_match = re.match("Transfer.* ([0-9]{8})", log_message)
run_date = run_date_match.group(1)
dts_run_summary_json["run_date"] = run_date
continue
elif log_message.__contains__("Finished extracting data"):
extraction_complete_match = re.match(
r"^Agent (.*)\. EXTRACT \((.*)\)\:.*\: (.*)$", log_message
)
if extraction_complete_match:
table_name = extraction_complete_match.group(2).split(".")[1]
if table_name not in job_stats_jsons:
job_stats_jsons[table_name] = job_stats_json_template.copy()
job_stats_jsons[table_name]["src_table_name"] = table_name
job_stats_jsons[table_name][
"agent_id"
] = extraction_complete_match.group(1)
job_stats_jsons[table_name][
"extract_duration"
] = extraction_complete_match.group(3)
dts_run_summary_json[
"agent_id"
] = extraction_complete_match.group(1)
continue
elif log_message.__contains__("Uploading"):
file_path_match = re.match(
r"^Agent .* EXTRACT \((.*)\): .* (gs:\/\/.*).$", log_message
)
if file_path_match:
table_name = file_path_match.group(1).split(".")[1]
if table_name not in job_stats_jsons:
job_stats_jsons[table_name] = job_stats_json_template.copy()
job_stats_jsons[table_name]["src_table_name"] = table_name
job_stats_jsons[table_name][
"gcs_file_path"
] = file_path_match.group(2)
continue
elif log_message.__contains__(": Extracting data"):
extract_stat_match = re.match(
r"^Agent .* EXTRACT \((.*)\)\: (.*)\:.*\(approximate (.*)\).*\[(.*)\] into (\d*).* (\d*) session.*$",
log_message,
)
if extract_stat_match:
table_name = extract_stat_match.group(1).split(".")[1]
if table_name not in job_stats_jsons:
job_stats_jsons[table_name] = job_stats_json_template.copy()
job_stats_jsons[table_name]["src_table_name"] = table_name
job_stats_jsons[table_name][
"extract_data_size"
] = extract_stat_match.group(3)
job_stats_jsons[table_name]["extract_partitions"] = (
"[" + extract_stat_match.group(4) + "]"
)
job_stats_jsons[table_name][
"extract_files"
] = extract_stat_match.group(5)
job_stats_jsons[table_name][
"extract_sessions"
] = extract_stat_match.group(6)
continue
elif log_message.__contains__(": Running"):
transfer_mode_match = re.match(
r"^Agent .* EXTRACT \((.*)\): Running in (.*) data transfer .*$",
log_message,
)
if transfer_mode_match:
table_name = transfer_mode_match.group(1).split(".")[1]
if table_name not in job_stats_jsons:
job_stats_jsons[table_name] = job_stats_json_template.copy()
job_stats_jsons[table_name]["src_table_name"] = table_name
job_stats_jsons[table_name][
"transfer_mode"
] = transfer_mode_match.group(2)
continue
elif log_message.__contains__("Number of records"):
job_summary_match = re.match(
r"Job (.*) \(table (.*)\) .* records: (\d*),.* (\d*).",
log_message,
)
if job_summary_match:
table_name = job_summary_match.group(2)
if table_name not in job_stats_jsons:
job_stats_jsons[table_name] = job_stats_json_template.copy()
job_stats_jsons[table_name]["src_table_name"] = table_name
job_stats_jsons[table_name][
"bq_job_id"
] = job_summary_match.group(1)
job_stats_jsons[table_name][
"success_records"
] = job_summary_match.group(3)
job_stats_jsons[table_name][
"error_records"
] = job_summary_match.group(4)
continue
elif log_message.__contains__("Summary:"):
run_summary_match = re.match(
r"^Summary: succeeded (\d*).*failed (\d*).*", log_message
)
dts_run_summary_json["succeeded_jobs"] = run_summary_match.group(1)
dts_run_summary_json["failed_jobs"] = run_summary_match.group(2)
continue
except Exception as e:
logging.error(
f"Method _process_transfer_logs: failed to extract value from log \
\n\tlog line: {log_message}\n\terror: {e}"
)
job_stats_json_rows = []
for row_dict in job_stats_jsons.values():
row_dict["run_date"] = run_date
if not row_dict["message"]:
row_dict["transfer_run_state"] = "SUCCEEDED"
else:
if row_dict["message"].__contains__("Skipping"):
row_dict["transfer_run_state"] = "SKIPPED"
else:
row_dict["transfer_run_state"] = "FAILED"
if row_dict["src_table_name"]:
job_stats_json_rows.append(row_dict)
logging.info(
f"Method _process_transfer_logs: extracted {len(job_stats_json_rows)} transfer run job rows from logs"
)
ti.xcom_push(key="dts_run_summary_json", value=dts_run_summary_json)
ti.xcom_push(key="job_stats_json_rows", value=job_stats_json_rows)