aiops/MicroAgents/loader/nezha.py (374 lines of code) (raw):
import glob
import os
from .base import BaseLoader
import polars as pl
import re
import json
from datetime import datetime, timedelta
import traceback
import pickle
class NezhaLoader(BaseLoader):
def __init__(self, filename, df=None, df_seq=None):
super().__init__(filename, df, df_seq)
def load(self):
log_queries = []
metric_queries = {
'src_dst': [],
'front_srv': [],
'dep': [],
'default': []
}
trace_queries = []
label_queries = []
all_label_df = pl.DataFrame()
date_pattern = re.compile(r'\d{4}-\d{2}-\d{2}') # Regular expression for 'dddd-dd-dd'
# Iterate over all subdirectories in the base directory
for subdir, dirs, files in os.walk(self.filename):
# Check if the current subdir is within rca_data or construct_data
if "rca_data" in subdir:
ano_folder = True
elif "construct_data" in subdir:
ano_folder = False
else:
continue # Skip if it's neither of those folders
match = date_pattern.match(os.path.basename(subdir))
if match:
date_str = match.group() # Extract the matched date string
#Log
log_path = os.path.join(subdir, "log")
if os.path.exists(log_path):
self.load_log(log_path, date_str, log_queries, ano_folder)
#Metrics
metric_path = os.path.join(subdir, "metric")
if os.path.exists(metric_path):
self.load_metric(metric_path, date_str, metric_queries, ano_folder)
#Traces
trace_path = os.path.join(subdir, "trace")
if os.path.exists(trace_path):
self.load_trace(trace_path, date_str, trace_queries, ano_folder)
#Labels
label_file = os.path.join(subdir, f"{date_str}-fault_list.json")
if os.path.exists(label_file):
#self.process_label(label_path, date_str, label_queries)
label_df = self.load_label(label_file, date_str)
if label_df is not None:
all_label_df = pl.concat([all_label_df, label_df])
#RCAs per service
#Hipster
rca_hip_file = os.path.join(subdir, f"root_cause_hipster.json")
if os.path.exists(rca_hip_file):
self.df_rca_hip = self.load_rca(rca_hip_file)
#TrainTicket
rca_ts_file = os.path.join(subdir, f"root_cause_ts.json")
if os.path.exists(rca_ts_file):
self.df_rca_ts = self.load_rca(rca_ts_file)
self.df_label = all_label_df
#Collect files that were read with lazy_frame
#Collect logs
dataframes = pl.collect_all(log_queries)
self.df = pl.concat(dataframes)
self.df = self.df.rename({"Log":"raw_m_message"})
#Collect traces
dataframes = pl.collect_all(trace_queries)
self.df_trace = pl.concat(dataframes)
# Collect metrics
for group, queries in metric_queries.items():
if queries:
try:
dataframes = pl.collect_all(queries)
if dataframes:
# Standardize column order based on the first DataFrame
reference_columns = dataframes[0].columns
standardized_dfs = [df.select(reference_columns) for df in dataframes]
df = pl.concat(standardized_dfs)
#Metrics are set here
setattr(self, f'df_metric_{group}', df)
except pl.exceptions.ShapeError as e:
print(f"Error concatenating group '{group}': {e}")
# Debugging: print out column names for each DataFrame in the group
for q in queries:
collected_df = q.collect()
print(f"Columns in {group}: {collected_df.columns}")
raise
def get_data_at_time(self, time_str, time_window=0, data_type='metric'):
data = {
'metric':self.df_metric_default,
'trace':self.df_trace,
'log':self.df
}
date_format = "%FT%H:%M:%S%.9fZ"
end_time = datetime.strptime(time_str, date_format)
start_time = end_time - timedelta(**time_window)
if data_type == 'metric':
data = self.df_metric_default.filter(pl.col("m_timestamp") >= start_time & pl.col("m_timestamp") < end_time)
return data
elif data_type == 'trace':
data = self.df_trace.filter(pl.col("m_timestamp") >= start_time & pl.col("m_timestamp") < end_time)
return data
elif data_type == 'log':
data = self.df.filter(pl.col("m_timestamp") >= start_time & pl.col("m_timestamp") < end_time)
return data
else:
raise ValueError(f"Invalid data type: {data_type}")
def load_rca (self, file_path):
# Read JSON data from a file
with open(file_path, 'r') as file:
data = json.load(file)
# Reshape the data
reshaped_data = []
for service, metrics in data.items():
row = {'service': service}
row.update(metrics)
reshaped_data.append(row)
# Create a DataFrame
return pl.DataFrame(reshaped_data)
def load_label(self, file_path, date_str):
label_data = pl.DataFrame()
try:
with open(file_path, 'r') as file:
data = json.load(file) #Polars JSON reader cannot handel the non-standard json so using json.load
# Iterate over each key in the JSON and extract the records
system_name = "Error"
if date_str == "2023-01-29" or date_str =="2023-01-30":
system_name = "TrainTicket"
elif date_str == "2022-08-23" or date_str =="2022-08-22":
system_name = "GShop"
for key in data:
records = data[key]
if records: # Check if the list is not empty
df = pl.DataFrame(records)
df = df.with_columns(
pl.lit(date_str).alias('date_folder'),
pl.lit(os.path.basename(file_path)).alias('label_file_name'),
pl.lit(system_name).alias('system_name')
)
label_data = label_data.vstack(df)
except json.JSONDecodeError as e:
print(f"JSON decoding error in file: {file_path}")
print(f"Error: {e}")
except Exception as e:
print(f"Error processing file: {file_path}")
print(f"Error: {e}")
traceback.print_exc()
return label_data
def load_metric(self, folder_path, date_str, queries, ano_folder):
if (ano_folder): #Metrics are copied to both ano_folder and normal_data folder
return
file_pattern = os.path.join(folder_path, "*.csv")
for file in glob.glob(file_pattern):
try:
system_name = "Error"
if date_str == "2023-01-29" or date_str =="2023-01-30":
system_name = "TrainTicket"
elif date_str == "2022-08-23" or date_str =="2022-08-22":
system_name = "GShop"
file_name = os.path.basename(file)
q = pl.scan_csv(file, has_header=True, infer_schema_length=0, separator=",")
q = q.with_columns(
pl.lit(date_str).alias('date_folder'), # Folder is date info
pl.lit(os.path.basename(file)).alias('metric_file_name'), # File name storage
pl.lit(system_name).alias('system_name')
)
if "source_50.csv" in file_name or "destination_50.csv" in file_name:
queries['src_dst'].append(q)
elif "front_service.csv" in file_name:
queries['front_srv'].append(q)
elif "dependency.csv" in file_name:
queries['dep'].append(q)
else:
queries['default'].append(q)
except pl.exceptions.NoDataError:
continue
def load_trace(self, folder_path, date_str, queries, ano_folder):
system_name = "Error"
if date_str == "2023-01-29" or date_str =="2023-01-30":
system_name = "TrainTicket"
elif date_str == "2022-08-23" or date_str =="2022-08-22":
system_name = "GShop"
file_pattern = os.path.join(folder_path, "*.csv")
for file in glob.glob(file_pattern):
try:
q = pl.scan_csv(file, has_header=True, separator=",", row_count_name="row_nr_per_file")
q = q.with_columns(
pl.lit(date_str).alias('date_folder'), # Folder is date info
pl.lit(os.path.basename(file)).alias('trace_file_name'), # File name storage
pl.lit(ano_folder).alias('anomaly_folder'),
pl.lit( system_name).alias('system_name')
)
queries.append(q)
except pl.exceptions.NoDataError: # some CSV files can be empty.
continue
def load_log(self, folder_path, date_str, queries, ano_folder):
file_pattern = os.path.join(folder_path, "*.csv")
for file in glob.glob(file_pattern):
try:
q = pl.scan_csv(file, has_header=True, infer_schema_length=0, separator=",",
row_count_name="row_nr_per_file", truncate_ragged_lines=True)
system_name = "Error"
if date_str == "2023-01-29" or date_str =="2023-01-30":
system_name = "TrainTicket"
elif date_str == "2022-08-23" or date_str =="2022-08-22":
system_name = "GShop"
q = q.with_columns(
pl.lit(date_str).alias('date_folder'), # Folder is date info
pl.lit(os.path.basename(file)).alias('log_file_name'), # File name storage
pl.lit(ano_folder).alias('anomaly_folder'),
pl.lit(system_name).alias('system_name')
)
queries.append(q)
except pl.exceptions.NoDataError: # some CSV files can be empty.
continue
# def _execute(self):
# if self.df is None:
# self.load()
# self.preprocess()
# return self.df
def preprocess(self):
#This removes 221 rows that apparently leak anomaly injection info to logs
self.df = self.df.with_columns(pl.col("raw_m_message").str.replace("Inject cpu successfully", "").str.replace("Inject cpu successfully", ""))
self._extract_log_message()
self.parse_timestamps()
self.process_metrics()
self.add_labels_to_metrics()
self.df = self.df.with_row_count()
self.df_trace = self.df_trace.with_row_count()
self.df = self.add_labels_to_df(self.df)
self.df_trace = self.add_labels_to_df(self.df_trace)
def process_metrics(self):
self.df_label = self.df_label.with_columns(
(pl.col("m_timestamp") + pl.duration(minutes=1)).alias("m_timestamp+1"),
(pl.col("m_timestamp") + pl.duration(minutes=3)).alias("m_timestamp+3"),
(pl.col("m_timestamp") + pl.duration(minutes=4)).alias("m_timestamp+4"),
)
self.df_metric_default = self.df_metric_default.with_row_count()
column_names = [
"CpuUsage(m)", "CpuUsageRate(%)", "MemoryUsage(Mi)", "MemoryUsageRate(%)",
"SyscallRead","SyscallWrite","NetworkReceiveBytes", "NetworkTransmitBytes",
"PodClientLatencyP90(s)", "PodServerLatencyP90(s)", "PodClientLatencyP95(s)",
"PodServerLatencyP95(s)", "PodClientLatencyP99(s)", "PodServerLatencyP99(s)",
"PodWorkload(Ops)", "PodSuccessRate(%)", "NodeCpuUsageRate(%)",
"NodeMemoryUsageRate(%)", "NodeNetworkReceiveBytes"
]
for col in column_names:
self.df_metric_default = self.df_metric_default.cast({col:pl.Float64})
def _extract_log_message(self):
self.df = self.df.with_row_count("row_key")#Used for matching later
#Splitting criteria. We have valid json and invalid flag
self.df = self.df.with_columns(normal_json = (pl.col("raw_m_message").str.contains("message")) &
(pl.col("raw_m_message").str.contains("severity")) &
(pl.col("raw_m_message").str.contains("timestamp")))
df_normal_json = self.df.filter(pl.col("normal_json")).select("raw_m_message", "row_key", "SpanID" )
df_abnormal_json = self.df.filter(~pl.col("normal_json")).select("raw_m_message", "row_key", "SpanID" )
#Double decode json
df_normal_json = df_normal_json.with_columns(pl.col("raw_m_message").str.json_decode())
df_normal_json = df_normal_json.with_columns(pl.col("raw_m_message").struct.field("log"))
df_normal_json = df_normal_json.with_columns(pl.col("log").str.json_decode())
#extract message and severity
df_normal_json = df_normal_json.with_columns(pl.col("log").struct.field("message"))
df_normal_json = df_normal_json.with_columns(pl.col("log").struct.field("severity"))
df_normal_json = df_normal_json.drop(["raw_m_message", "log"])
#Prepare abnormal for merge
df_abnormal_json = df_abnormal_json.with_columns(severity = pl.lit(""))
df_abnormal_json = df_abnormal_json.rename({"raw_m_message":"message"})
df_abnormal_json = df_abnormal_json.select(df_normal_json.columns)
#The dataframes have now equal fields and no overlap -> vertical stack
df_t3 = df_normal_json.vstack(df_abnormal_json)
#Each log message contains span and trace ids remove them here as they are already separate columns
#"message\":\"TraceID: 04c707faa29852d058b7ad236b6ef47a SpanID: 7f8791f4ed419539 Get currency data successful\",
#Remove extra beginning
df_t3 = df_t3.with_columns(pl.col("message").str.split_exact(df_t3["SpanID"],1)
.alias("fields")
.struct.rename_fields(["redu1", "message_part"])
).unnest("fields")
#Remove extra end that is in ones coming from df_abnormal_json
df_t3 = df_t3.with_columns(pl.col("message_part").str.split_exact('\",', 1)
.alias("fields")
.struct.rename_fields(["m_message", "redu2"])
).unnest("fields")
#Lose any extra preceeding and trailing characters.
df_t3 = df_t3.with_columns(
pl.col("m_message")
.str.strip_chars_start()
.str.strip_chars_end('\n\\\\\\n'))
#Drop unnecessary columns and merge to main df
df_t3 = df_t3.drop(["message", "redu1", "redu2", "message_part", "SpanID", "normal_json"])
self.df =self.df.join(df_t3, "row_key", "left")
self.df = self.df.drop(["normal_json"])
#Epoch is corrupted using human readable format
# https://github.com/IntelligentDDS/Nezha/issues/8
def _parse_timestamps_epoch(self):
#Logs
#There are some timestamp starting with -6 when should 16
## https://github.com/IntelligentDDS/Nezha/issues/8
self.df = self.df.with_columns(
m_timestamp=pl.when(pl.col('TimeUnixNano').str.starts_with("-6"))
.then(pl.col('TimeUnixNano').str.replace(r".$",""))#The minus lines are one element too long
.otherwise(pl.col('TimeUnixNano')))
self.df = self.df.with_columns(m_timestamp = pl.col("m_timestamp").str.replace(r"^-6","16"))
self.df = self.df.with_columns(m_timestamp = pl.col("m_timestamp").str.to_integer())
self.df = self.df.with_columns(m_timestamp = pl.from_epoch(pl.col("m_timestamp"), time_unit="ns"))
#Traces
self.df_trace = self.df_trace.with_columns(m_timestamp = pl.from_epoch(pl.col("StartTimeUnixNano"), time_unit="ns"))
self.df_trace = self.df_trace.with_columns(m_timestamp_end = pl.from_epoch(pl.col("EndTimeUnixNano"), time_unit="ns"))
#self.df_metric_default = self.df_metric_default.with_columns(m_timestamp = pl.from_epoch(pl.col("TimeStamp")))
#For some reason some metric datasets have incorrect unix time e.g. 1861140279 when it should be 1661140279.
# https://github.com/IntelligentDDS/Nezha/issues/8
self.df_metric_default = self.df_metric_default.with_columns(m_timestamp = pl.col("TimeStamp").str.replace(r"^18","16"))
self.df_metric_default = self.df_metric_default.with_columns(m_timestamp = pl.from_epoch(pl.col("m_timestamp")))
#Labels
self.df_label = self.df_label.with_columns(m_timestamp = pl.from_epoch(pl.col("inject_timestamp"), time_unit="s"))
def parse_timestamps(self):
#Epoch is corrupted using human readable format
# https://github.com/IntelligentDDS/Nezha/issues/8
#Logs
self.df = self.df.with_columns(
pl.coalesce(
# Handeling Not consistent format
# Most are formated 2023-01-29T09:33:09.036923751Z
pl.col('Timestamp').str.strptime(pl.Datetime, "%FT%H:%M:%S%.9fZ",strict=False),
#While others are 2023-01-29T09:33:14.716
pl.col('Timestamp').str.strptime(pl.Datetime, "%FT%H:%M:%S%.3f",strict=False),
).alias("m_timestamp")
)
#Traces. Only Epoch time available
self.df_trace = self.df_trace.with_columns(m_timestamp = pl.from_epoch(pl.col("StartTimeUnixNano"), time_unit="ns"))
self.df_trace = self.df_trace.with_columns(m_timestamp_end = pl.from_epoch(pl.col("EndTimeUnixNano"), time_unit="ns"))
#Metric
self.df_metric_default = self.df_metric_default.with_columns(
m_timestamp = pl.col('Time').str.split(" +0000").list[0]
)
self.df_metric_default = self.df_metric_default.with_columns(
m_timestamp = pl.col('m_timestamp').str.strptime(pl.Datetime, "%F %H:%M:%S%.9f")
)
#Labels
self.df_label = self.df_label.with_columns(m_timestamp = pl.col('inject_time').str.strptime(pl.Datetime, "%F %H:%M:%S"))
def add_labels_to_metrics(self):
# Join labels with metrics
df_metrics_labels = self.df_label.lazy().join(
self.df_metric_default.lazy(),
left_on="inject_pod",
right_on="PodName",
how="inner",
suffix="_metric"
).collect(streaming=True)
# Calculate full anomaly flag
df_metrics_labels = df_metrics_labels.with_columns(
pl.when((pl.col("m_timestamp_metric") >= pl.col("m_timestamp+1")) &
(pl.col("m_timestamp_metric") <= pl.col("m_timestamp+3")))
.then(True)
.otherwise(False)
.alias("is_full_anomaly")
)
# Calculate anomaly ratio for early metrics
df_metrics_labels_early = df_metrics_labels.filter(
(pl.col("m_timestamp_metric") > pl.col("m_timestamp")) &
(pl.col("m_timestamp_metric") < pl.col("m_timestamp+1"))
).with_columns(
((pl.col("m_timestamp_metric") - pl.col("m_timestamp")) / pl.duration(minutes=1)).alias("ano_ratio")
)
# Calculate anomaly ratio for late metrics
df_metrics_labels_late = df_metrics_labels.filter(
(pl.col("m_timestamp_metric") > pl.col("m_timestamp+3")) &
(pl.col("m_timestamp_metric") < pl.col("m_timestamp+4"))
).with_columns(
((pl.col("m_timestamp+4") - pl.col("m_timestamp_metric")) / pl.duration(minutes=1)).alias("ano_ratio")
)
# Combine early and late metrics
df_metrics_labels_combined = df_metrics_labels_early.vstack(df_metrics_labels_late)
# Handle full anomalies
df_metrics_labels_full = df_metrics_labels.filter(pl.col("is_full_anomaly")).with_columns(pl.lit(1.0).alias("ano_ratio"))
# Stack all together
df_metrics_labels_final = df_metrics_labels_full.vstack(df_metrics_labels_combined)
# Select relevant columns
df_anomalies = df_metrics_labels_final.select(["row_nr", "is_full_anomaly", "ano_ratio", "inject_type"])
# Join and update the default metrics DataFrame
self.df_metric_default = self.df_metric_default.join(
df_anomalies,
on="row_nr",
how="left"
).with_columns([
pl.col("is_full_anomaly").fill_null(False),
pl.col("ano_ratio").fill_null(0),
((pl.col("inject_type") == "cpu_consumed") |
(pl.col("inject_type") == "network_delay") |
(pl.col("inject_type") == "cpu_contention")).alias("metric_anomaly")
])
def add_labels_to_df(self, df_to_modify):
df_logs_labels_anos = self.df_label.lazy().join(
df_to_modify.lazy(),
left_on="inject_pod",
right_on="PodName",
how="inner",
suffix="_log"
).filter(
(pl.col("m_timestamp_log") > pl.col("m_timestamp")) &
(pl.col("m_timestamp_log") <= pl.col("m_timestamp+3"))
).collect(streaming=True)
df_logs_labels_anos = df_logs_labels_anos.with_columns(pl.lit(True).alias("anomaly"))
df_logs_labels_anos = df_logs_labels_anos.select(["row_nr", "anomaly", "inject_type"])
df_to_modify = df_to_modify.join(
df_logs_labels_anos,
on="row_nr",
how="left"
)
df_to_modify = df_to_modify.with_columns(pl.col("anomaly").fill_null(False))
df_to_modify = df_to_modify.with_columns(
(
(pl.col("inject_type") == "cpu_consumed") |
(pl.col("inject_type") == "network_delay") |
(pl.col("inject_type") == "cpu_contention")
).alias("metric_anomaly")
)
return df_to_modify
def save_processed_data(self, path):
with open(f"{path}/log_processed.pkl", "wb") as f:
pickle.dump(self.df, f)
with open(f"{path}/metric_processed.pkl", "wb") as f:
pickle.dump(self.df_metric_default, f)
with open(f"{path}/trace_processed.pkl", "wb") as f:
pickle.dump(self.df_trace, f)
with open(f"{path}/label_processed.pkl", "wb") as f:
pickle.dump(self.df_label, f)
def load_processed_data(self, path):
with open(f"{path}/log_processed.pkl", "rb") as f:
self.df = pickle.load(f)
with open(f"{path}/metric_processed.pkl", "rb") as f:
self.df_metric_default = pickle.load(f)
with open(f"{path}/trace_processed.pkl", "rb") as f:
self.df_trace = pickle.load(f)
with open(f"{path}/label_processed.pkl", "rb") as f:
self.df_label = pickle.load(f)
# full_data = "/home/mmantyla/Datasets"
# loader = NezhaLoader(filename= f"{full_data}/nezha/",)
# df = loader.execute()
# loader.df.describe()
# loader.df_trace.describe()
# loader.df_metric_default.describe()