aiops/ContrastiveLearningLogClustering/utils/datasets.py (491 lines of code) (raw):
import re
import time
import pandas as pd
import os
from itertools import combinations
import math
import random
from utils.preprocess import *
from torch.utils.data import Dataset
from collections import OrderedDict
from sentence_transformers import InputExample
benchmark_settings = {
'HDFS': {
'log_file': 'HDFS/HDFS_2k.log',
'log_format': '<Date> <Time> <Pid> <Level> <Component>: <Content>',
'regex': [r'blk_-?\d+', r'(\d+\.){3}\d+(:\d+)?'],
'description': 'Hadoop distributed file system log',
'distance_threshold': 0.005,
},
'Hadoop': {
'log_file': 'Hadoop/Hadoop_2k.log',
'log_format': '<Date> <Time> <Level> \[<Process>\] <Component>: <Content>',
'regex': [r'(\d+\.){3}\d+'],
'description': 'Hadoop mapreduce job log',
'distance_threshold': 0.08,
},
'Spark': {
'log_file': 'Spark/Spark_2k.log',
'log_format': '<Date> <Time> <Level> <Component>: <Content>',
'regex': [r'(\d+\.){3}\d+', r'\b[KGTM]?B\b', r'([\w-]+\.){2,}[\w-]+'],
'description': 'Spark job log',
'distance_threshold': 0.05,
},
'Zookeeper': {
'log_file': 'Zookeeper/Zookeeper_2k.log',
'log_format': '<Date> <Time> - <Level> \[<Node>:<Component>@<Id>\] - <Content>',
'regex': [r'(/|)(\d+\.){3}\d+(:\d+)?'],
'description': 'ZooKeeper service log',
'distance_threshold': 0.03,
},
'BGL': {
'log_file': 'BGL/BGL_2k.log',
'log_format': '<Label> <Timestamp> <Date> <Node> <Time> <NodeRepeat> <Type> <Component> <Level> <Content>',
'regex': [r'core\.\d+'],
'description': 'Blue Gene/L supercomputer log',
'distance_threshold': 0.07,
},
'HPC': {
'log_file': 'HPC/HPC_2k.log',
'log_format': '<LogId> <Node> <Component> <State> <Time> <Flag> <Content>',
'regex': [r'=\d+'],
'description': 'High performance cluster log',
'distance_threshold': 0.06,
},
'Thunderbird': {
'log_file': 'Thunderbird/Thunderbird_2k.log',
'log_format': '<Label> <Timestamp> <Date> <User> <Month> <Day> <Time> <Location> <Component>(\[<PID>\])?: <Content>',
'regex': [r'(\d+\.){3}\d+'],
'description': 'Thunderbird supercomputer log',
'distance_threshold': 0.18,
},
'Windows': {
'log_file': 'Windows/Windows_2k.log',
'log_format': '<Date> <Time>, <Level> <Component> <Content>',
'regex': [r'0x.*?\s'],
'description': 'Windows event log',
'distance_threshold': 0.04,
},
'Linux': {
'log_file': 'Linux/Linux_2k.log',
'log_format': '<Month> <Date> <Time> <Level> <Component>(\[<PID>\])?: <Content>',
'regex': [r'(\d+\.){3}\d+', r'\d{2}:\d{2}:\d{2}'],
'description': 'Linux system log',
'distance_threshold': 0.14,
},
'Andriod': {
'log_file': 'Andriod/Andriod_2k.log',
'log_format': '<Date> <Time> <Pid> <Tid> <Level> <Component>: <Content>',
'regex': [r'(/[\w-]+)+', r'([\w-]+\.){2,}[\w-]+', r'\b(\-?\+?\d+)\b|\b0[Xx][a-fA-F\d]+\b|\b[a-fA-F\d]{4,}\b'],
'description': 'Android framework log',
'distance_threshold': 0.02,
},
'HealthApp': {
'log_file': 'HealthApp/HealthApp_2k.log',
'log_format': '<Time>\|<Component>\|<Pid>\|<Content>',
'regex': [],
'description': 'Health app log',
'distance_threshold': 0.04,
},
'Apache': {
'log_file': 'Apache/Apache_2k.log',
'log_format': '\[<Time>\] \[<Level>\] <Content>',
'regex': [r'(\d+\.){3}\d+'],
'description': 'Apache web server error log',
'distance_threshold': 0.01,
},
'Proxifier': {
'log_file': 'Proxifier/Proxifier_2k.log',
'log_format': '\[<Time>\] <Program> - <Content>',
'regex': [r'<\d+\ssec', r'([\w-]+\.)+[\w-]+(:\d+)?', r'\d{2}:\d{2}(:\d{2})*', r'[KGTM]B'],
'description': 'Proxifier software log',
'distance_threshold': 0.04,
},
'OpenSSH': {
'log_file': 'OpenSSH/OpenSSH_2k.log',
'log_format': '<Date> <Day> <Time> <Component> sshd\[<Pid>\]: <Content>',
'regex': [r'(\d+\.){3}\d+', r'([\w-]+\.){2,}[\w-]+'],
'description': 'OpenSSH server log',
'distance_threshold': 0.002,
},
'OpenStack': {
'log_file': 'OpenStack/OpenStack_2k.log',
'log_format': '<Logrecord> <Date> <Time> <Pid> <Level> <Component> \[<ADDR>\] <Content>',
'regex': [r'((\d+\.){3}\d+,?)+', r'/.+?\s', r'\d+'],
'description': 'OpenStack infrastructure log',
'distance_threshold': 0.02,
},
'Mac': {
'log_file': 'Mac/Mac_2k.log',
'log_format': '<Month> <Date> <Time> <User> <Component>\[<PID>\]( \(<Address>\))?: <Content>',
'regex': [r'([\w-]+\.){2,}[\w-]+'],
'description': 'Mac OS log',
'distance_threshold': 0.05,
}
}
def generate_logformat_regex(logformat):
headers = []
splitters = re.split(r'(<[^<>]+>)', logformat)
# print(splitters)
regex = ''
for k in range(len(splitters)):
if k % 2 == 0:
splitter = re.sub(' +', '\\\s+', splitters[k])
regex += splitter
else:
header = splitters[k].strip('<').strip('>')
regex += '(?P<%s>.*?)' % header
headers.append(header)
regex = re.compile('^' + regex + '$')
return headers, regex
def log_to_dataframe(log_file, regex, headers, logformat, max_len=10000000):
log_messages = []
linecount = 0
with open(log_file, 'r', errors='ignore') as fin:
for line in fin.readlines():
try:
match = regex.search(line.strip())
message = [match.group(header) for header in headers]
log_messages.append(message)
linecount += 1
if linecount>=max_len:
print("The number of lines of logs exceeds ",max_len)
break
except Exception as e:
pass
logdf = pd.DataFrame(log_messages, columns=headers)
logdf.insert(0, 'LineId', None)
logdf['LineId'] = [i + 1 for i in range(linecount)]
return logdf
def load_train_log(test_log_type=None,benchmark_settings={}):
df_log = None
for log_type in benchmark_settings:
if log_type != test_log_type:
log_format = benchmark_settings[log_type]['log_format']
headers, regex = generate_logformat_regex(log_format)
df_data = log_to_dataframe(os.path.join("./logs/"+log_type+"/", log_type+"_2k.log"), regex, headers, log_format)
log_rex = benchmark_settings[log_type]['regex']
df_data['Content'] = df_data['Content'].apply(lambda x : add_var_token(log_rex,x))
if df_log is None:
df_log = df_data['Content']
else:
# df_log.append(df_data['Content'])
df_log = pd.concat([df_log,df_data['Content']], ignore_index=True)
# for idx, line in df_log.iterrows():
# # log_temp = line['Level']+' '+line['Component'] +': '+line['Content']
# log_temp = line['Content']
# # corpus.append(log_temp.lower())
# corpus.append(log_temp)
return df_log
def load_test_log(log_type,benchmark_settings):
# if log_type=="HDFS":
# log_format = '<Date> <Time> <Pid> <Level> <Component>: <Content>'
# elif log_type=="Windows":
# log_format = '<Date> <Time> <Level> <Component> <Content>'
# elif log_type=="HPC":
# log_format = '<Logld> <Node> <Component> <State> <Time> <Flag> <Content>'
log_format = benchmark_settings[log_type]['log_format']
headers, regex = generate_logformat_regex(log_format)
df_log = log_to_dataframe(os.path.join("./logs/"+log_type+"/", log_type+"_2k.log"), regex, headers, log_format)
log_rex = benchmark_settings[log_type]['regex']
df_log['Content'] = df_log['Content'].apply(lambda x : (add_var_token(log_rex,x)))
corpus = []
for idx, line in df_log.iterrows():
# log_temp = line['Level']+' '+line['Component'] +': '+line['Content']
temp_log = line['Content']
# corpus.append(log_temp.lower())
corpus.append(temp_log)
# log_rex = benchmark_settings[log_type]['regex']
# corpus = [add_var_token(log_rex,s) for s in corpus]
return df_log, corpus
def generate_positive_samples(test_log_type=None, benchmark_settings=None):
positive_samples = {}
for log_type in benchmark_settings:
if log_type != test_log_type and log_type!='industrial1' and log_type!='industrial2':
df_log_structured = pd.read_csv("./logs/"+log_type+"/"+log_type+"_2k.log_structured.csv")
df_log_template = pd.read_csv("./logs/"+log_type+"/"+log_type+"_2k.log_templates.csv")
df_log_template = df_log_template.drop_duplicates(subset=['EventId'])
samples = {}
for idx, line in df_log_template.iterrows():
temp_id = line['EventId']
temp_log = df_log_structured[df_log_structured['EventId']==temp_id]
temp_log = temp_log['Content'].to_list()
log_rex = benchmark_settings[log_type]['regex']
temp_log = [(add_var_token(log_rex,s)) for s in temp_log]
if len(temp_log)>=2:
for pairs in combinations(temp_log,2):
if pairs[0]!=pairs[1]:
if not temp_id in samples.keys():
samples[temp_id]=OrderedDict()
# pairs = tuple(pairs)
reverse_pairs = tuple([pairs[1],pairs[0]])
if not (pairs in samples[temp_id]) and not (reverse_pairs in samples[temp_id]):
samples[temp_id][pairs] = 0
if temp_id in samples.keys():
samples[temp_id] = list(samples[temp_id].keys())
positive_samples[log_type] = samples
if test_log_type=='industrial1' or test_log_type=='industrial2':
log_type = 'industrial1' if test_log_type=='industrial2' else 'industrial2'
# print("Loading",log_type,"positive pairs...")
log_path = './'+log_type.lower()+'_test.csv'
df_log = pd.read_csv(log_path)
df_labeled = df_log[df_log['label_id']!=-1]
positive_event = list(df_labeled['label_id'].value_counts().index)
samples = {}
for temp_id in positive_event:
df_temp = df_labeled[df_labeled['label_id']==temp_id]
df_temp = df_temp.sample(frac=1.0, random_state=42)
temp_log = df_temp['Content'].to_list()
if len(temp_log)>=2:
for pairs in combinations(temp_log,2):
if pairs[0]!=pairs[1]:
if not temp_id in samples.keys():
samples[temp_id]=OrderedDict()
reverse_pairs = tuple([pairs[1],pairs[0]])
if not (pairs in samples[temp_id]) and not (reverse_pairs in samples[temp_id]):
samples[temp_id][pairs] = 0
if temp_id in samples.keys():
samples[temp_id] = list(samples[temp_id].keys())
positive_samples[log_type] = samples
positive_corpus = []
all_event = OrderedDict()
for d in positive_samples:
# print(d)
for e in positive_samples[d]:
all_event[(d,e)] = len(positive_samples[d][e])
for pairs in positive_samples[d][e]:
# print(i)
positive_corpus.append(pairs)
return positive_corpus, all_event, positive_samples
def generate_neutral_samples(test_log_type=None, positive_corpus=[], benchmark_settings={}):
# dataset_corpus = []
neutral_corpus = set()
# positive_corpus = set(positive_corpus)
neutral_nums = 150000
sub_nums = int(neutral_nums/16) if test_log_type is None else int(neutral_nums/15)
for log_type in benchmark_settings:
if log_type != test_log_type:
# print(log_type)
df_log_structured = pd.read_csv("./logs/"+log_type+"/"+log_type+"_2k.log_structured.csv")
df_log_template = pd.read_csv("./logs/"+log_type+"/"+log_type+"_2k.log_templates.csv")
df_log_template = df_log_template.drop_duplicates(subset=['EventId'])
samples = {}
dataset_event = []
for idx, line in df_log_template.iterrows():
dataset_event.append(line['EventId'])
subsub_nums = int(sub_nums/math.comb(len(dataset_event),2))
for event_pairs in combinations(dataset_event,2):
pair1_corpus = []
pair2_corpus = []
for idx, line in df_log_structured[df_log_structured['EventId']==event_pairs[0]].iterrows():
temp_log = line['Content']
pair1_corpus.append(temp_log)
for idx, line in df_log_structured[df_log_structured['EventId']==event_pairs[1]].iterrows():
temp_log = line['Content']
pair2_corpus.append(temp_log)
log_rex = benchmark_settings[log_type]['regex']
pair1_corpus = [add_var_token(log_rex,s) for s in pair1_corpus]
pair2_corpus = [add_var_token(log_rex,s) for s in pair2_corpus]
random.shuffle(pair1_corpus)
random.shuffle(pair2_corpus)
count = 0
for i in range(len(pair1_corpus)):
for j in range(len(pair2_corpus)):
pairs = [pair1_corpus[i],pair2_corpus[j]]
reverse_pairs = pairs[::-1]
pairs = tuple(pairs)
reverse_pairs = tuple(reverse_pairs)
if pairs[0]!=pairs[1] and not (pairs in neutral_corpus) and not (reverse_pairs in neutral_corpus):
neutral_corpus.add(pairs)
count += 1
if count>=subsub_nums:
break
if count>=subsub_nums:
break
return list(neutral_corpus)
def generate_negetive_samples(test_log_type=None, positive_corpus=[], neutral_corpus=[], benchmark_settings={}):
# df_log = None
all_dataset = []
# all_corpus = []
# positive_corpus = set(positive_corpus)
# neutral_corpus = set(neutral_corpus)
for log_type in benchmark_settings:
if log_type != test_log_type:
all_dataset.append(log_type)
random.seed(42)
negetive_corpus = set()
count = 0
negetive_nums = 160000
sub_nums = int(negetive_nums/math.comb(len(all_dataset),2))
index = 0
for dataset_pairs in combinations(all_dataset,2):
pair1_corpus = []
pair2_corpus = []
df_log_structured = pd.read_csv("./logs/"+dataset_pairs[0]+"/"+dataset_pairs[0]+"_2k.log_structured.csv")
for idx, line in df_log_structured.iterrows():
temp_log = line['Content']
pair1_corpus.append(temp_log)
log_rex = benchmark_settings[dataset_pairs[0]]['regex']
pair1_corpus = [add_var_token(log_rex,s) for s in pair1_corpus]
df_log_structured = pd.read_csv("./logs/"+dataset_pairs[1]+"/"+dataset_pairs[1]+"_2k.log_structured.csv")
for idx, line in df_log_structured.iterrows():
temp_log = line['Content']
pair2_corpus.append(temp_log)
log_rex = benchmark_settings[dataset_pairs[1]]['regex']
pair2_corpus = [add_var_token(log_rex,s) for s in pair2_corpus]
random.shuffle(pair1_corpus)
random.shuffle(pair2_corpus)
count = 0
while count<sub_nums:
pairs = [pair1_corpus[index],pair2_corpus[index]]
reverse_pairs = pairs[::-1]
pairs = tuple(pairs)
reverse_pairs = tuple(reverse_pairs)
if pairs[0]!=pairs[1] and not (pairs in negetive_corpus) and not (reverse_pairs in negetive_corpus):
negetive_corpus.add(pairs)
count += 1
index += 1
if index>=len(pair1_corpus) or index>=len(pair2_corpus):
index = 0
random.shuffle(pair1_corpus)
random.shuffle(pair2_corpus)
return list(negetive_corpus)
def generate_contrastive_samples(positive_samples, all_event, batch_size, max_len=100000):
remain_event = all_event
contrastive_corpus = []
max_len = max_len
# random.seed(42)
while len(remain_event)>=batch_size:
event_list = list(remain_event.keys())
random.shuffle(event_list)
# print(event_list)
for i in range(len(event_list)//batch_size):
event_pairs = event_list[i*batch_size:i*batch_size+batch_size]
for event in event_pairs:
positive_pair = positive_samples[event[0]][event[1]][0]
positive_samples[event[0]][event[1]] = positive_samples[event[0]][event[1]][1:]
contrastive_corpus.append(positive_pair)
remain_event[event] -= 1
if remain_event[event]==0:
del remain_event[event]
if len(contrastive_corpus)>=max_len:
break
if len(contrastive_corpus)>=max_len:
break
return contrastive_corpus
def generate_contrastive_samples2(positive_samples, all_event, batch_size, max_len=100000):
contrastive_corpus = []
max_len = max_len
event_list = list(all_event.keys())
positive_index = dict.fromkeys(event_list,0)
random.seed(42)
while len(contrastive_corpus)<=max_len:
random.shuffle(event_list)
for i in range(len(event_list)//batch_size):
events_in_batch = event_list[i*batch_size:i*batch_size+batch_size]
for event in events_in_batch:
positive_pair = positive_samples[event[0]][event[1]][positive_index[event]]
positive_index[event] += 1
if positive_index[event] >= len(positive_samples[event[0]][event[1]]):
positive_index[event] = 0
contrastive_corpus.append(positive_pair)
if len(contrastive_corpus)>=max_len:
break
return contrastive_corpus
def industry_positive_samples(log_path,batch_size):
df_log = pd.read_csv(log_path)
df_labeled = df_log[df_log['label_id']!=-1]
log_select_num = 4
positive_event = list(df_labeled['label_id'].value_counts().index)
positive_samples = {}
samples = {}
for temp_id in positive_event:
# if len(samples)>=batch_size:
# break
df_temp = df_labeled[df_labeled['label_id']==temp_id]
df_temp = df_temp.sample(frac=1.0, random_state=42)
temp_log = df_temp['Content'].iloc[:log_select_num].to_list()
# temp_log = df_temp['Content'].to_list()
if len(temp_log)>=2:
for pairs in combinations(temp_log,2):
if pairs[0]!=pairs[1]:
if not temp_id in samples.keys():
samples[temp_id]=set()
# pairs = tuple(pairs)
reverse_pairs = tuple([pairs[1],pairs[0]])
# if not (pairs in samples[temp_id]) and not (reverse_pairs in samples[temp_id]):
# samples[temp_id].add(pairs)
samples[temp_id].add(pairs)
if temp_id in samples.keys():
samples[temp_id] = list(samples[temp_id])
if len(samples)<batch_size:
print("Positive samples len:",len(samples))
print("Cannot generate enough positive samples!")
raise
positive_samples['industry'] = samples
positive_corpus = []
all_event = {}
for d in positive_samples:
# print(d)
for e in positive_samples[d]:
all_event[(d,e)] = len(positive_samples[d][e])
for pairs in positive_samples[d][e]:
# print(i)
positive_corpus.append(pairs)
return positive_corpus, all_event, positive_samples
def load_event_log(test_log_type=None, benchmark_settings=None, model=None):
all_event_log = {}
log_to_event = {}
for log_type in benchmark_settings:
if log_type != test_log_type and log_type!='industrial1' and log_type!='industrial2':
# if log_type == 'HPC':
df_log_structured = pd.read_csv("./logs/"+log_type+"/"+log_type+"_2k.log_structured.csv")
df_log_template = pd.read_csv("./logs/"+log_type+"/"+log_type+"_2k.log_templates.csv")
df_log_template = df_log_template.drop_duplicates(subset=['EventId'])
for idx, line in df_log_template.iterrows():
temp_id = line['EventId']
temp_log = df_log_structured[df_log_structured['EventId']==temp_id]
temp_log = temp_log['Content'].to_list()
# temp_log = [add_blank_token(s) for s in temp_log]
log_rex = benchmark_settings[log_type]['regex']
temp_log = [(add_var_token(log_rex,s)) for s in temp_log]
# temp_log = [(add_var_token(log_rex,s)) for s in temp_log]
event_id = log_type+temp_id
all_event_log[event_id] = temp_log
log_tokens = model.tokenize(temp_log)
for i in range(len(log_tokens['input_ids'])):
log_token = log_tokens['input_ids'][i].cpu().numpy()
token_mask = log_tokens['attention_mask'][i].cpu().numpy()
log_token = log_token[token_mask!=0]
log_to_event[tuple(log_token.tolist())] = event_id
# for log_token in log_tokens['input_ids']:
# log_token = log_token.cpu().numpy()
# log_token = log_token[log_token!=0]
# log_to_event[tuple(log_token.tolist())] = event_id
if test_log_type=='industrial1' or test_log_type=='industrial2':
log_type = 'industrial1' if test_log_type=='industrial2' else 'industrial2'
print("Loading",log_type,"event...")
log_path = './'+log_type.lower()+'_test.csv'
df_log = pd.read_csv(log_path)
df_labeled = df_log[df_log['label_id']!=-1]
label_ids = df_labeled['label_id'].unique()
for temp_id in label_ids:
temp_log = df_labeled[df_labeled['label_id']==temp_id]
temp_log = temp_log['Content'].to_list()
all_event_log[temp_id] = temp_log
log_tokens = model.tokenize(temp_log)
for i in range(len(log_tokens['input_ids'])):
log_token = log_tokens['input_ids'][i].cpu().numpy()
token_mask = log_tokens['attention_mask'][i].cpu().numpy()
log_token = log_token[token_mask!=0]
log_to_event[tuple(log_token.tolist())] = temp_id
return all_event_log, log_to_event
def load_event_log_industrial(log_path,model=None):
all_event_log = {}
log_to_event = {}
df_log = pd.read_csv(log_path)
df_labeled = df_log[df_log['label_id']!=-1]
label_ids = df_labeled['label_id'].unique()
# print(label_ids)
for temp_id in label_ids:
temp_log = df_labeled[df_labeled['label_id']==temp_id]
temp_log = temp_log['Content'].to_list()
all_event_log[temp_id] = temp_log
log_tokens = model.tokenize(temp_log)
for i in range(len(log_tokens['input_ids'])):
log_token = log_tokens['input_ids'][i].cpu().numpy()
token_mask = log_tokens['attention_mask'][i].cpu().numpy()
log_token = log_token[token_mask!=0]
log_to_event[tuple(log_token.tolist())] = temp_id
return all_event_log, log_to_event
def load_industry_log(file_path):
df_log = pd.read_csv(file_path)
df_labeled = df_log[df_log['label_id']!=-1]
# print(df_labeled)
# print(len(df_labeled))
label_count = df_labeled['label_id'].value_counts()
# print('label_count:',label_count)
# print(label_count)
print('label type amount:',len(label_count))
corpus = []
for idx, line in df_labeled.iterrows():
# log_temp = line['Level']+' '+line['Component'] +': '+line['Content']
# log_temp = line['sample_raw']
log_temp = line['Content']
try:
log_temp.lower()
except:
print('label_id:',line['label_id'])
print('index:',idx)
print('log:',log_temp)
print("This preprocessed log is Null!")
# corpus.append(log_temp.lower())
corpus.append(log_temp)
# for idx, line in df_log[df_log['label_id']==1661968714].iterrows():
# print(line["sample"])
return df_log, corpus
def generate_samples(sample_len,test_log_type,batch_size):
positive_corpus, all_event, positive_samples = generate_positive_samples(test_log_type=test_log_type,benchmark_settings=benchmark_settings)
contrastive_corpus = generate_contrastive_samples(positive_samples,all_event,batch_size,max_len=sample_len)
random_index = [i for i in range(len(contrastive_corpus)//batch_size)]
random.shuffle(random_index)
train_corpus = []
for i in random_index:
train_corpus.append(contrastive_corpus[i*batch_size:i*batch_size+batch_size])
train_examples = []
for batch_corpus in train_corpus:
for pairs in batch_corpus:
train_examples.append(InputExample(texts=list(pairs)))
return train_examples
class Log_Dataset(Dataset):
def __init__(self,df_train) -> None:
# df_noraml = df_noraml.sample(frac=1.0, random_state=42)
# df_abnormal = df_abnormal.sample(frac=1.0, random_state=42)
# df_log = pd.concat([df_normal.iloc[:10000],df_abnormal.iloc[:10000]], ignore_index=True)
# df_log = df_log.sample(frac=1.0, random_state=42)
self.data = df_train
def __getitem__(self, index):
return self.data['Content'].iloc[index],self.data['Label'].iloc[index]
def __len__(self):
return len(self.data)