prediction_generation/original-project/analysis/scripts/metrics.py (63 lines of code) (raw):
# -*- coding: utf-8 -*-
"""
Evaluation metrics
Author: G.J.J. van den Burg
Copyright (c) 2020 - The Alan Turing Institute
License: See the LICENSE file.
"""
def true_positives(T, X, margin=5):
"""Compute true positives without double counting
>>> true_positives({1, 10, 20, 23}, {3, 8, 20})
{1, 10, 20}
>>> true_positives({1, 10, 20, 23}, {1, 3, 8, 20})
{1, 10, 20}
>>> true_positives({1, 10, 20, 23}, {1, 3, 5, 8, 20})
{1, 10, 20}
>>> true_positives(set(), {1, 2, 3})
set()
>>> true_positives({1, 2, 3}, set())
set()
"""
# make a copy so we don't affect the caller
X = set(list(X))
TP = set()
for tau in T:
close = [(abs(tau - x), x) for x in X if abs(tau - x) <= margin]
close.sort()
if not close:
continue
dist, xstar = close[0]
TP.add(tau)
X.remove(xstar)
return TP
def f_measure(annotations, predictions, margin=5, alpha=0.5, return_PR=False):
"""Compute the F-measure based on human annotations.
annotations : dict from user_id to iterable of CP locations
predictions : iterable of predicted CP locations
alpha : value for the F-measure, alpha=0.5 gives the F1-measure
return_PR : whether to return precision and recall too
Remember that all CP locations are 0-based!
>>> f_measure({1: [10, 20], 2: [11, 20], 3: [10], 4: [0, 5]}, [10, 20])
1.0
>>> f_measure({1: [], 2: [10], 3: [50]}, [10])
0.9090909090909091
>>> f_measure({1: [], 2: [10], 3: [50]}, [])
0.8
"""
# ensure 0 is in all the sets
Tks = {k + 1: set(annotations[uid]) for k, uid in enumerate(annotations)}
for Tk in Tks.values():
Tk.add(0)
X = set(predictions)
X.add(0)
Tstar = set()
for Tk in Tks.values():
for tau in Tk:
Tstar.add(tau)
K = len(Tks)
P = len(true_positives(Tstar, X, margin=margin)) / len(X)
TPk = {k: true_positives(Tks[k], X, margin=margin) for k in Tks}
R = 1 / K * sum(len(TPk[k]) / len(Tks[k]) for k in Tks)
F = P * R / (alpha * R + (1 - alpha) * P)
if return_PR:
return F, P, R
return F
def overlap(A, B):
""" Return the overlap (i.e. Jaccard index) of two sets
>>> overlap({1, 2, 3}, set())
0.0
>>> overlap({1, 2, 3}, {2, 5})
0.25
>>> overlap(set(), {1, 2, 3})
0.0
>>> overlap({1, 2, 3}, {1, 2, 3})
1.0
"""
return len(A.intersection(B)) / len(A.union(B))
def partition_from_cps(locations, n_obs):
""" Return a list of sets that give a partition of the set [0, T-1], as
defined by the change point locations.
>>> partition_from_cps([], 5)
[{0, 1, 2, 3, 4}]
>>> partition_from_cps([3, 5], 8)
[{0, 1, 2}, {3, 4}, {5, 6, 7}]
>>> partition_from_cps([1,2,7], 8)
[{0}, {1}, {2, 3, 4, 5, 6}, {7}]
>>> partition_from_cps([0, 4], 6)
[{0, 1, 2, 3}, {4, 5}]
"""
T = n_obs
partition = []
current = set()
all_cps = iter(sorted(set(locations)))
cp = next(all_cps, None)
for i in range(T):
if i == cp:
if current:
partition.append(current)
current = set()
cp = next(all_cps, None)
current.add(i)
partition.append(current)
return partition
def cover_single(Sprime, S):
"""Compute the covering of a segmentation S by a segmentation Sprime.
This follows equation (8) in Arbaleaz, 2010.
>>> cover_single([{1, 2, 3}, {4, 5}, {6}], [{1, 2, 3}, {4, 5, 6}])
0.8333333333333334
>>> cover_single([{1, 2, 3, 4}, {5, 6}], [{1, 2, 3, 4, 5, 6}])
0.6666666666666666
>>> cover_single([{1, 2}, {3, 4}, {5, 6}], [{1, 2, 3}, {4, 5, 6}])
0.6666666666666666
>>> cover_single([{1, 2, 3, 4, 5, 6}], [{1}, {2}, {3}, {4, 5, 6}])
0.3333333333333333
"""
T = sum(map(len, Sprime))
assert T == sum(map(len, S))
C = 0
for R in S:
C += len(R) * max(overlap(R, Rprime) for Rprime in Sprime)
C /= T
return C
def covering(annotations, predictions, n_obs):
"""Compute the average segmentation covering against the human annotations.
annotations : dict from user_id to iterable of CP locations
predictions : iterable of predicted Cp locations
n_obs : number of observations in the series
>>> covering({1: [10, 20], 2: [10], 3: [0, 5]}, [10, 20], 45)
0.7962962962962963
>>> covering({1: [], 2: [10], 3: [40]}, [10], 45)
0.7954144620811286
>>> covering({1: [], 2: [10], 3: [40]}, [], 45)
0.8189300411522634
"""
Ak = {
k + 1: partition_from_cps(annotations[uid], n_obs)
for k, uid in enumerate(annotations)
}
pX = partition_from_cps(predictions, n_obs)
Cs = [cover_single(pX, Ak[k]) for k in Ak]
return sum(Cs) / len(Cs)