prediction_generation/original-project/analysis/scripts/metrics.py (63 lines of code) (raw):

# -*- coding: utf-8 -*- """ Evaluation metrics Author: G.J.J. van den Burg Copyright (c) 2020 - The Alan Turing Institute License: See the LICENSE file. """ def true_positives(T, X, margin=5): """Compute true positives without double counting >>> true_positives({1, 10, 20, 23}, {3, 8, 20}) {1, 10, 20} >>> true_positives({1, 10, 20, 23}, {1, 3, 8, 20}) {1, 10, 20} >>> true_positives({1, 10, 20, 23}, {1, 3, 5, 8, 20}) {1, 10, 20} >>> true_positives(set(), {1, 2, 3}) set() >>> true_positives({1, 2, 3}, set()) set() """ # make a copy so we don't affect the caller X = set(list(X)) TP = set() for tau in T: close = [(abs(tau - x), x) for x in X if abs(tau - x) <= margin] close.sort() if not close: continue dist, xstar = close[0] TP.add(tau) X.remove(xstar) return TP def f_measure(annotations, predictions, margin=5, alpha=0.5, return_PR=False): """Compute the F-measure based on human annotations. annotations : dict from user_id to iterable of CP locations predictions : iterable of predicted CP locations alpha : value for the F-measure, alpha=0.5 gives the F1-measure return_PR : whether to return precision and recall too Remember that all CP locations are 0-based! >>> f_measure({1: [10, 20], 2: [11, 20], 3: [10], 4: [0, 5]}, [10, 20]) 1.0 >>> f_measure({1: [], 2: [10], 3: [50]}, [10]) 0.9090909090909091 >>> f_measure({1: [], 2: [10], 3: [50]}, []) 0.8 """ # ensure 0 is in all the sets Tks = {k + 1: set(annotations[uid]) for k, uid in enumerate(annotations)} for Tk in Tks.values(): Tk.add(0) X = set(predictions) X.add(0) Tstar = set() for Tk in Tks.values(): for tau in Tk: Tstar.add(tau) K = len(Tks) P = len(true_positives(Tstar, X, margin=margin)) / len(X) TPk = {k: true_positives(Tks[k], X, margin=margin) for k in Tks} R = 1 / K * sum(len(TPk[k]) / len(Tks[k]) for k in Tks) F = P * R / (alpha * R + (1 - alpha) * P) if return_PR: return F, P, R return F def overlap(A, B): """ Return the overlap (i.e. Jaccard index) of two sets >>> overlap({1, 2, 3}, set()) 0.0 >>> overlap({1, 2, 3}, {2, 5}) 0.25 >>> overlap(set(), {1, 2, 3}) 0.0 >>> overlap({1, 2, 3}, {1, 2, 3}) 1.0 """ return len(A.intersection(B)) / len(A.union(B)) def partition_from_cps(locations, n_obs): """ Return a list of sets that give a partition of the set [0, T-1], as defined by the change point locations. >>> partition_from_cps([], 5) [{0, 1, 2, 3, 4}] >>> partition_from_cps([3, 5], 8) [{0, 1, 2}, {3, 4}, {5, 6, 7}] >>> partition_from_cps([1,2,7], 8) [{0}, {1}, {2, 3, 4, 5, 6}, {7}] >>> partition_from_cps([0, 4], 6) [{0, 1, 2, 3}, {4, 5}] """ T = n_obs partition = [] current = set() all_cps = iter(sorted(set(locations))) cp = next(all_cps, None) for i in range(T): if i == cp: if current: partition.append(current) current = set() cp = next(all_cps, None) current.add(i) partition.append(current) return partition def cover_single(Sprime, S): """Compute the covering of a segmentation S by a segmentation Sprime. This follows equation (8) in Arbaleaz, 2010. >>> cover_single([{1, 2, 3}, {4, 5}, {6}], [{1, 2, 3}, {4, 5, 6}]) 0.8333333333333334 >>> cover_single([{1, 2, 3, 4}, {5, 6}], [{1, 2, 3, 4, 5, 6}]) 0.6666666666666666 >>> cover_single([{1, 2}, {3, 4}, {5, 6}], [{1, 2, 3}, {4, 5, 6}]) 0.6666666666666666 >>> cover_single([{1, 2, 3, 4, 5, 6}], [{1}, {2}, {3}, {4, 5, 6}]) 0.3333333333333333 """ T = sum(map(len, Sprime)) assert T == sum(map(len, S)) C = 0 for R in S: C += len(R) * max(overlap(R, Rprime) for Rprime in Sprime) C /= T return C def covering(annotations, predictions, n_obs): """Compute the average segmentation covering against the human annotations. annotations : dict from user_id to iterable of CP locations predictions : iterable of predicted Cp locations n_obs : number of observations in the series >>> covering({1: [10, 20], 2: [10], 3: [0, 5]}, [10, 20], 45) 0.7962962962962963 >>> covering({1: [], 2: [10], 3: [40]}, [10], 45) 0.7954144620811286 >>> covering({1: [], 2: [10], 3: [40]}, [], 45) 0.8189300411522634 """ Ak = { k + 1: partition_from_cps(annotations[uid], n_obs) for k, uid in enumerate(annotations) } pX = partition_from_cps(predictions, n_obs) Cs = [cover_single(pX, Ak[k]) for k in Ak] return sum(Cs) / len(Cs)