distributed_nhc/export_nhc_result_to_kusto.py (210 lines of code) (raw):

#!/usr/bin/python3 import sys import os import json import re import subprocess from datetime import datetime from csv import DictReader from argparse import ArgumentParser from azure.identity import ManagedIdentityCredential, DefaultAzureCredential from azure.kusto.data import KustoConnectionStringBuilder from azure.kusto.ingest import QueuedIngestClient, IngestionProperties import pandas as pd def ingest_health_log(health_file, creds, ingest_url, database, health_table_name): filename_parts = os.path.basename(health_file).split("-", maxsplit=2) ts_str = filename_parts[2].split(".")[0] ts = datetime.strptime(ts_str, "%Y-%m-%d_%H-%M-%S") job_name = filename_parts[1] if job_name == "pssh": job_name = f"{job_name}-{ts_str}" with open(health_file, 'r') as f: lines = f.readlines() reader = DictReader(lines, fieldnames = ["Hostname", "RawResult"], delimiter='|', restkey="extra") df = pd.DataFrame(reader) df['Timestamp'] = ts df['JobName'] = job_name df['NodeName'] = df.apply(lambda x: x['Hostname'].strip(), axis=1) df['RawResult'] = df.apply(lambda x: x['RawResult'].strip(), axis=1) df['Healthy'] = df.apply(lambda x: x['RawResult'] == "Healthy", axis=1) df = df[['Timestamp', 'JobName', 'Hostname', 'Healthy', 'RawResult']] ingest_client = QueuedIngestClient(KustoConnectionStringBuilder.with_azure_token_credential(ingest_url, creds)) print(f"Ingesting health results from {os.path.basename(health_file)} into {ingest_url} at {database}/{health_table_name}") ingest_client.ingest_from_dataframe(df, IngestionProperties(database, health_table_name)) def ingest_debug_log(debug_file, creds, ingest_url, database, debug_table_name): filename_parts = os.path.basename(debug_file).split("-", maxsplit=2) ts_str = filename_parts[2].split(".")[0] ts = datetime.strptime(ts_str, "%Y-%m-%d_%H-%M-%S") job_name = filename_parts[1] if job_name == "pssh": job_name = f"{job_name}-{ts_str}" with open(debug_file, 'r') as f: lines = f.readlines() reader = DictReader(lines, fieldnames = ["Hostname", "DebugLog"], delimiter='|', restkey="extra") df = pd.DataFrame(reader) df['Timestamp'] = ts df['JobName'] = job_name df['NodeName'] = df.apply(lambda x: x['Hostname'].strip(), axis=1) df['DebugLog'] = df.apply(lambda x: x['DebugLog'].strip(), axis=1) df = df[['Timestamp', 'JobName', 'Hostname', 'DebugLog']] ingest_client = QueuedIngestClient(KustoConnectionStringBuilder.with_azure_token_credential(ingest_url, creds)) print(f"Ingesting health results from {os.path.basename(debug_file)} into {ingest_url} at {database}/{debug_table_name}") ingest_client.ingest_from_dataframe(df, IngestionProperties(database, debug_table_name)) def run_command(cmd): result = subprocess.run(cmd, capture_output=True, shell=True, text=True) return result.stdout.strip() def get_nhc_json_formatted_result(results_file): def natural_sort_key(s): return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', s)] # check if GPU or CPU processor_cmd = f"lspci | grep -iq NVIDIA" # if not empty, then GPU processor_str = run_command(processor_cmd) processor = "GPU" if processor_str else "CPU" if processor == "GPU": ib_write_lb_mlx5_ib_cmd = f"cat {results_file} | grep -o 'ib_write_lb_mlx5_ib[0-7]: .*'" ib_write_lb_mlx5_ib_str = run_command(ib_write_lb_mlx5_ib_cmd) ib_write_lb_mlx5_ib_str = sorted(ib_write_lb_mlx5_ib_str.strip().split("\n"), key=natural_sort_key) ib_write_lb_mlx5_ib_str = '\n'.join(ib_write_lb_mlx5_ib_str) # convert to string H2D_GPU_cmd = f"cat {results_file} | grep -o 'H2D_GPU_[0-7]: .*'" H2D_GPU_str = run_command(H2D_GPU_cmd) D2H_GPU_cmd = f"cat {results_file} | grep -o 'D2H_GPU_[0-7]: .*'" D2H_GPU_str = run_command(D2H_GPU_cmd) P2P_GPU_cmd = f"cat {results_file} | grep -o 'P2P_GPU_[0-7]_[0-7]: .*'" P2P_GPU_str = run_command(P2P_GPU_cmd) nccl_all_red_cmd = f"cat {results_file} | grep -o 'nccl_all_red: .*'" nccl_all_red_str = run_command(nccl_all_red_cmd) nccl_all_red_lb_cmd = f"cat {results_file} | grep -o 'nccl_all_red_lb: .*'" nccl_all_red_lb_str = run_command(nccl_all_red_lb_cmd) data_string = "\n".join([ib_write_lb_mlx5_ib_str, H2D_GPU_str, D2H_GPU_str, P2P_GPU_str, nccl_all_red_str, nccl_all_red_lb_str]) data_string = os.linesep.join([s for s in data_string.splitlines() if s]) # remove empty lines result = {"IB_WRITE_GDR": {}, "GPU_BW_HTD": {}, "GPU_BW_DTH": {}, "GPU_BW_P2P": {}, "NCCL_ALL_REDUCE": {}, "NCCL_ALL_REDUCE_LOOP_BACK": {}} # Split the string by lines and create key-value pairs for line in data_string.strip().split("\n"): if not line or line.isspace(): continue key, value = line.split(":") if key.startswith("ib_write_lb_mlx5_ib"): result["IB_WRITE_GDR"][key] = str(value.strip()) elif key.startswith("H2D"): result["GPU_BW_HTD"][key] = str(value.strip()) elif key.startswith("D2H"): result["GPU_BW_DTH"][key] = str(value.strip()) elif key.startswith("P2P"): result["GPU_BW_P2P"][key] = str(value.strip()) elif key.startswith("nccl_all_red_lb"): result["NCCL_ALL_REDUCE_LOOP_BACK"] = str(value.strip()) elif key.startswith("nccl_all_red"): result["NCCL_ALL_REDUCE"] = str(value.strip()) else: # processor == "CPU" ib_write_lb_mlx5_ib_cmd = f"cat {results_file} | grep -o 'ib_write_lb_mlx5_ib[0-7]: .*'" ib_write_lb_mlx5_ib_str = run_command(ib_write_lb_mlx5_ib_cmd) ib_write_lb_mlx5_ib_str = sorted(ib_write_lb_mlx5_ib_str.strip().split("\n"), key=natural_sort_key) ib_write_lb_mlx5_ib_str = '\n'.join(ib_write_lb_mlx5_ib_str) # convert to string stream_Copy_cmd = f"cat {results_file} | grep -o 'stream_Copy: .*'" stream_Copy_str = run_command(stream_Copy_cmd) stream_Add_cmd = f"cat {results_file} | grep -o 'stream_Add: .*'" stream_Add_str = run_command(stream_Add_cmd) stream_Scale_cmd = f"cat {results_file} | grep -o 'stream_Scale: .*'" stream_Scale_str = run_command(stream_Scale_cmd) stream_Triad_cmd = f"cat {results_file} | grep -o 'stream_Triad: .*'" stream_Triad_str = run_command(stream_Triad_cmd) data_string = "\n".join([ib_write_lb_mlx5_ib_str, stream_Copy_str, stream_Add_str, stream_Scale_str, stream_Triad_str]) data_string = os.linesep.join([s for s in data_string.splitlines() if s]) # remove empty lines result = {"IB_WRITE_NON_GDR": {}, "stream_Copy": {}, "stream_Add": {}, "stream_Scale": {}, "stream_Triad": {}} # Split the string by lines and create key-value pairs for line in data_string.strip().split("\n"): if not line or line.isspace(): continue key, value = line.split(":") if key.startswith("ib_write_lb_mlx5_ib"): result["IB_WRITE_NON_GDR"][key] = str(value.strip()) elif key.startswith("stream_Copy"): result["stream_Copy"]= str(value.strip()) elif key.startswith("stream_Add"): result["stream_Add"]= str(value.strip()) elif key.startswith("stream_Scale"): result["stream_Scale"]= str(value.strip()) elif key.startswith("stream_Triad"): result["stream_Triad"]= str(value.strip()) return result def ingest_results(results_file, creds, ingest_url, database, results_table_name, nhc_run_uuid="None"): ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") job_name = results_file.replace("\\", "/").split(".")[0].split("/")[-1] # account for \ or / in path uuid = job_name if nhc_run_uuid == "None" else f"{nhc_run_uuid}-{job_name}" if uuid == "health": uuid = "" else : uuid = "-" + uuid # add the dash here instead of below; this way if 'uuid' is empty, we don't have a trailing dash full_uuid = f"nhc-{ts}{uuid}" vmSize_bash_cmd = "echo $( curl -H Metadata:true --max-time 10 -s \"http://169.254.169.254/metadata/instance/compute/vmSize?api-version=2021-01-01&format=text\") | tr '[:upper:]' '[:lower:]' " vmSize = run_command(vmSize_bash_cmd) vmId_bash_cmd = "curl -H Metadata:true --max-time 10 -s \"http://169.254.169.254/metadata/instance/compute/vmId?api-version=2021-02-01&format=text\"" vmId = run_command(vmId_bash_cmd) vmName_bash_cmd = "hostname" vmName = run_command(vmName_bash_cmd) physhost = run_command("echo $(hostname) \"$(/opt/azurehpc/tools/kvp_client | grep Fully)\" | cut -d ':' -f 3 | cut -d ' ' -f 2 | sed 's/\"//g'") if not physhost: physhost = "not Mapped" with open(results_file, 'r') as f: full_results = f.read() jsonResultDict = get_nhc_json_formatted_result(results_file) jsonResult = json.dumps(jsonResultDict) record = { 'vmSize': vmSize, 'vmId': vmId, 'vmHostname': vmName, 'physHostname': physhost, 'workflowType': "main", 'time': ts, 'pass': False, # keep as default false 'errors': '', 'logOutput': full_results, # the entire file 'jsonResult': jsonResult, 'uuid': full_uuid } if "ERROR" in full_results: record['pass'] = False record['errors'] = full_results elif "Node Health Check completed successfully" in full_results: record['pass'] = True else: record['pass'] = False record['errors'] = "No Node Health Check completed successfully or ERROR" df = pd.DataFrame(record, index=[0]) ingest_client = QueuedIngestClient(KustoConnectionStringBuilder.with_azure_token_credential(ingest_url, creds)) print(f"Ingesting results from {os.path.basename(results_file)} into {ingest_url} at {database}/{results_table_name}") ingest_client.ingest_from_dataframe(df, IngestionProperties(database, results_table_name)) def parse_args(): parser = ArgumentParser(description="Ingest NHC results into Kusto") parser.add_argument("health_files", nargs="+", help="List of .health.log or .debug.log files to ingest") parser.add_argument("--ingest_url", help="Kusto ingest URL", required=True) parser.add_argument("--database", help="Kusto database", required=True) parser.add_argument("--health_table_name", default="NodeHealthCheck", help="Kusto table name for health results") parser.add_argument("--debug_table_name", default="NodeHealthCheck_Debug", help="Kusto table name for debug results") parser.add_argument("--results_table_name", default="AzNhcRunEvents", help="Kusto table name for results") parser.add_argument("--uuid", default="None", help="UUID to help identify results in Kusto table") parser.add_argument("--identity", nargs="?", const=True, default=False, help="Managed Identity to use for authentication, if a client ID is provided it will be used, otherwise the system-assigned identity will be used. If --identity is not provided DefaultAzureCredentials will be used.") return parser.parse_args() def get_creds(identity): if identity is True: return ManagedIdentityCredential() elif identity: return ManagedIdentityCredential(client_id=identity) else: return DefaultAzureCredential() args = parse_args() creds = get_creds(args.identity) print(f"Attempting to ingest: {', '.join(args.health_files)}") for health_file in args.health_files: try: if not os.path.exists(health_file): raise FileNotFoundError(f"Cannot find file '{health_file}'") if health_file.endswith(".health.log"): ingest_health_log(health_file, creds, args.ingest_url, args.database, args.health_table_name) elif health_file.endswith(".debug.log"): ingest_debug_log(health_file, creds, args.ingest_url, args.database, args.debug_table_name) elif health_file.endswith(".log"): ingest_results(health_file, creds, args.ingest_url, args.database, args.results_table_name, args.uuid) else: raise Exception("Unsupported file, must be .health.log or .debug.log produced by ./distributed_nhc.sb.sh, or .log produced by run-health-checks.sh") except FileNotFoundError: if len(args.health_files) == 1: print(f"Cannot find file '{health_file}'") raise print(f"Cannot find file '{health_file}', skipping...")