prediction_generation/summarize.py (101 lines of code) (raw):
"""
Summarize the results into a single file per dataset.
For each dataset we want::
{
"dataset": "<name>",
"dataset_nobs": N,
"dataset_ndim": N,
"annotations": {
"<user_id>": [...],
"<user_id>": [...],
},
"results": {
"<method>": [
{
"parameters": {
"<param>": value,
},
"arguments": {
"<arg>": value,
},
"cplocations": [...],
"scores": {
"<score_1>": value,
},
"status": <status>
},
{
"parameters": {
"<param>": value,
},
"arguments": {
"<arg>": value,
},
"cplocations": [...],
"scores": {
"<score_1>": value,
},
"status": <status>
},
],
}
}
Basic cleanup on the change point locations will also be performed:
- deduplication
- removal of invalid indices. Recall that indices are 0-based. We remove
any indices smaller than 1 and larger than n_obs - 2. The reason that we
don't allow 0 or n_obs - 1 (both valid endpoints) is that several
algorithms declare these locations as change points by default and they
are meaningless.
Author: Gertjan van den Burg
Copyright (c) 2020 - The Alan Turing Institute
License: See the LICENSE file.
"""
import argparse
import json
import os
import sys
from metrics import f_measure, covering
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"-a",
"--annotation-file",
help="Path to annotation file",
required=True,
)
parser.add_argument(
"-d", "--dataset-file", help="Path to dataset file", required=True
)
parser.add_argument(
"-r", "--result-dir", help="Directory of abed results", required=True
)
parser.add_argument("-o", "--output-file", help="File to write to")
return parser.parse_args()
def load_json(filename):
with open(filename, "r") as fp:
try:
s = fp.read()
s = s[s.find('{'): s.rfind('}') + 1] # sometimes, abed writes non-sense to the files -> filter it out
data = json.loads(s)
except json.decoder.JSONDecodeError:
print("Error parsing json file: %s" % filename, file=sys.stderr)
return {"error": "parsing error"}
return data
def load_annotations(filename, dataset):
with open(filename, "r") as fp:
data = json.load(fp)
return data[dataset]
def clean_cps(locations, dataset):
n_obs = dataset["n_obs"]
valid = set([x for x in locations if 1 <= x < n_obs - 1])
return sorted(valid)
def main():
args = parse_args()
dataset = load_json(args.dataset_file)
annotations = load_annotations(args.annotation_file, dataset["name"])
out = {
"dataset": dataset["name"],
"dataset_nobs": dataset["n_obs"],
"dataset_ndim": dataset["n_dim"],
"annotations": annotations,
"results": {},
}
data_results = next(
(d for d in os.listdir(args.result_dir) if d == dataset["name"]), None
)
if data_results is None:
print(
"Couldn't find the result directory for dataset %s"
% dataset["name"],
file=sys.stderr,
)
raise SystemExit(1)
dataset_dir = os.path.join(args.result_dir, data_results)
for method in sorted(os.listdir(dataset_dir)):
method_dir = os.path.join(dataset_dir, method)
for result_file in sorted(os.listdir(method_dir)):
fname = os.path.join(method_dir, result_file)
result = load_json(fname)
if not method in out["results"]:
out["results"][method] = []
if result["status"].lower() == "success":
locations = clean_cps(result["result"]["cplocations"], dataset)
f1, precision, recall = f_measure(
annotations, locations, return_PR=True
)
n_obs = dataset["n_obs"]
cover = covering(annotations, locations, n_obs)
scores = {
"f1": f1,
"precision": precision,
"recall": recall,
"cover": cover,
}
else:
locations = None
scores = None
out["results"][method].append(
{
"parameters": result["parameters"],
"task_file": result_file,
"cplocations": locations,
"scores": scores,
"status": result['status'],
"args": result['args']
}
)
if args.output_file:
with open(args.output_file, "w") as fp:
json.dump(out, fp, indent="\t")
else:
print(json.dumps(out, indent="\t"))
if __name__ == "__main__":
main()