aiops/AnomalyDetection/model/eval_methods.py (129 lines of code) (raw):

# -*- coding: utf-8 -*- import numpy as np import matplotlib.pyplot as plt from sklearn.metrics import auc def calc_point2point(predict, actual): """ calculate f1 score by predict and actual. Args: predict (np.ndarray): the predict label actual (np.ndarray): np.ndarray """ TP = np.sum(predict * actual) TN = np.sum((1 - predict) * (1 - actual)) FP = np.sum(predict * (1 - actual)) FN = np.sum((1 - predict) * actual) precision = TP / (TP + FP + 0.00001) recall = TP / (TP + FN + 0.00001) f1 = 2 * precision * recall / (precision + recall + 0.00001) return f1, precision, recall, TP, TN, FP, FN def adjust_predicts(score, label, threshold=None, pred=None, calc_latency=False): """ Calculate adjusted predict labels using given `score`, `threshold` (or given `pred`) and `label`. Args: score (np.ndarray): The anomaly score label (np.ndarray): The ground-truth label threshold (float): The threshold of anomaly score. A point is labeled as "anomaly" if its score is lower than the threshold. pred (np.ndarray or None): if not None, adjust `pred` and ignore `score` and `threshold`, calc_latency (bool): Returns: np.ndarray: predict labels """ if len(score) != len(label): raise ValueError("score and label must have the same length") score = np.asarray(score) label = np.asarray(label) latency = 0 if pred is None: predict = score < threshold else: predict = pred actual = label > 0.1 anomaly_state = False anomaly_count = 0 for i in range(len(score)): if actual[i] and predict[i] and not anomaly_state: anomaly_state = True anomaly_count += 1 for j in range(i, 0, -1): if not actual[j]: break else: if not predict[j]: predict[j] = True latency += 1 elif not actual[i]: anomaly_state = False if anomaly_state: predict[i] = True if calc_latency: return predict, latency / (anomaly_count + 1e-4) else: return predict def calc_seq(score, label, threshold, calc_latency=False): """ Calculate f1 score for a score sequence """ if calc_latency: predict, latency = adjust_predicts(score, label, threshold, calc_latency=calc_latency) t = list(calc_point2point(predict, label)) t.append(latency) return t else: predict = adjust_predicts(score, label, threshold, calc_latency=calc_latency) return calc_point2point(predict, label) def bf_search(score, label, start, end=None, step_num=1, display_freq=1, verbose=True): """ Find the best-f1 score by searching best `threshold` in [`start`, `end`). Returns: list: list for results float: the `threshold` for best-f1 """ if step_num is None or end is None: end = start step_num = 1 search_step, search_range, search_lower_bound = step_num, end - start, start if verbose: print("search range: ", search_lower_bound, search_lower_bound + search_range) threshold = search_lower_bound m = (-1., -1., -1.) m_t = 0.0 for i in range(search_step): threshold += search_range / float(search_step) target = calc_seq(score, label, threshold, calc_latency=True) if target[0] > m[0]: m_t = threshold m = target if verbose and i % display_freq == 0: print("cur thr: ", threshold, target, m, m_t) print(m, m_t) return m, m_t def searchThreshold(score,label): mins=np.min(score) maxs=np.max(score) stride=(maxs-mins)/1000 threshold=-1 maxF1=[0]*7 i=mins+stride precisions=[] recalls=[] while i<maxs+stride: pred, p_latency = adjust_predicts(score, label, i, calc_latency=True) p_t = calc_point2point(pred, label) precisions.append(p_t[1]) recalls.append(p_t[2]) if p_t[0]>maxF1[0]: maxF1=p_t threshold=i i+=stride score=np.array(score) mask=score<threshold num=mask.sum() ratio=num/score.size AUPR=0. sumv=[] for i in range(1,len(recalls)): AUPR+=(recalls[i]-recalls[i-1])*precisions[i-1] sumv.append((recalls[i]-recalls[i-1])*precisions[i-1]) return { 'pot-f1': maxF1[0], 'pot-precision': maxF1[1], 'pot-recall': maxF1[2], 'pot-TP': maxF1[3], 'pot-TN': maxF1[4], 'pot-FP': maxF1[5], 'pot-FN': maxF1[6], 'pot-threshold': threshold, 'threshold-ratio':ratio, "AUPR":AUPR } def getF1score(score,label,threshold): pred, p_latency = adjust_predicts(score, label, threshold, calc_latency=True) p_t = calc_point2point(pred, label) return { 'pot-f1': p_t[0], 'pot-precision': p_t[1], 'pot-recall': p_t[2], 'pot-TP': p_t[3], 'pot-TN': p_t[4], 'pot-FP': p_t[5], 'pot-FN': p_t[6], 'pot-threshold': threshold, 'pot-latency': p_latency }