in src/datamigration/dags/redshift/redshift_transfer_run_log_dag.py [0:0]
def _process_transfer_logs(ti) -> None:
"""
Processes DTS logs and extracts relevant information for tables' migration
"""
transfer_config_id = ti.xcom_pull(
key="transfer_config_id", task_ids="load_parameters"
)
transfer_run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
unique_id = ti.xcom_pull(key="unique_id", task_ids="load_parameters")
dts_run_summary_json = ti.xcom_pull(
key="dts_run_summary_json", task_ids="get_transfer_run_summary"
)
job_stats_json_template = ti.xcom_pull(
key="job_stats_json", task_ids="load_parameters"
)
dts_run_logs = ti.xcom_pull(key="dts_run_logs", task_ids="get_transfer_run_logs")
job_stats_json_template["unique_id"] = unique_id
job_stats_json_template["transfer_run_id"] = transfer_run_id
job_stats_json_template["transfer_config_id"] = transfer_config_id
job_stats_jsons = {}
# TODO: add parsing logic for empty table migration
# TODO: shorten error messages before inserting to bigquery
for log_dict in dts_run_logs:
log_message = log_dict["messageText"]
try:
# Parsing error messages
if log_dict["severity"] == "ERROR":
job_stats_json_template["job_status"] = "FAILED"
if log_message.__contains__("Job"):
job_msg_match = re.match(r"Job (.*) \(table (.*?)\).*", log_message)
if job_msg_match:
table_name = job_msg_match.group(2)
if table_name not in job_stats_jsons:
job_stats_jsons[table_name] = job_stats_json_template.copy()
job_stats_jsons[table_name]["src_table_name"] = table_name
job_stats_jsons[table_name]["bq_job_id"] = job_msg_match.group(
1
)
job_stats_jsons[table_name]["message"] = (
job_stats_jsons[table_name]["message"] + " " + log_message
)
else:
if not dts_run_summary_json.get("error_message"):
dts_run_summary_json["error_message"] = log_message
continue
elif log_dict["severity"] == "INFO":
job_stats_json_template["job_status"] = "SUCCEEDED"
if log_message.__contains__("Transfer load"):
run_date_match = re.match("Transfer.* ([0-9]{8})", log_message)
run_date = run_date_match.group(1)
dts_run_summary_json["run_date"] = run_date
continue
# Done only to map source table_name with target table name
# elif log_message.__contains__("data will be appended to existing table"):
# # For target table name
# target_summary_match = re.match(
# r"Table (.*) already exists in (.*);",
# log_message,
# )
# target_dataset_name = target_summary_match.group(2).split(".")[1]
# target_table_name = (
# target_dataset_name + "." + target_summary_match.group(1)
# )
# logging.info(target_table_name)
# continue
# Done
elif log_message.__contains__("Number of records"):
job_summary_match = re.match(
r"Job (.*) \(table (.*)\) .* records: (\d*),.* (\d*).",
log_message,
)
if job_summary_match:
table_name = job_summary_match.group(2)
if table_name not in job_stats_jsons:
job_stats_jsons[table_name] = job_stats_json_template.copy()
job_stats_jsons[table_name]["src_table_name"] = table_name
job_stats_jsons[table_name][
"bq_job_id"
] = job_summary_match.group(1)
job_stats_jsons[table_name][
"success_records"
] = job_summary_match.group(3)
job_stats_jsons[table_name][
"error_records"
] = job_summary_match.group(4)
continue
# Done
elif log_message.__contains__("Summary:"):
run_summary_match = re.match(
r"^Summary: succeeded (\d*).*failed (\d*).*", log_message
)
dts_run_summary_json["succeeded_jobs"] = run_summary_match.group(1)
dts_run_summary_json["failed_jobs"] = run_summary_match.group(2)
continue
except Exception as e:
logging.error(
f"Method _process_transfer_logs: failed to extract value from log \
\n\tlog line: {log_message}\n\terror: {e}"
)
job_stats_json_rows = []
for row_dict in job_stats_jsons.values():
row_dict["run_date"] = run_date
if not row_dict["message"]:
row_dict["transfer_run_state"] = "SUCCEEDED"
else:
if row_dict["message"].__contains__("Skipping"):
row_dict["transfer_run_state"] = "SKIPPED"
else:
row_dict["transfer_run_state"] = "FAILED"
if row_dict["src_table_name"]:
job_stats_json_rows.append(row_dict)
logging.info(
f"Method _process_transfer_logs: extracted {len(job_stats_json_rows)} transfer run job rows from logs"
)
ti.xcom_push(key="dts_run_summary_json", value=dts_run_summary_json)
ti.xcom_push(key="job_stats_json_rows", value=job_stats_json_rows)