prediction_postprocessing_scripts/merge_cpds.py (146 lines of code) (raw):
import json
import os
import argparse
import sys
import random
import string
# bocpd, amoc, binseg, cpnp, kcpa, mongodb, pelt, rfpop, wbs, zero
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"-i",
"--input-directory",
help="Path of results input directory",
required=True,
)
parser.add_argument(
"-o",
"--output-directory",
help="Path of results output directory",
required=True,
)
parser.add_argument(
"-f",
"--first-method",
help="First method",
required=True
)
parser.add_argument(
"-s",
"--second-method",
help="Second method",
required=True
)
parser.add_argument(
"-c",
"--combination-strategy",
choices=['union', 'intersection_strict', 'intersection_first'],
help="Strategy of combining the CPLocations from different methods",
required=True
)
parser.add_argument(
"-m",
"--margin",
help="Margin for True Positive verification",
default=5,
required=False
)
return parser.parse_args()
def load_json(json_file_path):
with open(json_file_path, 'r', encoding='utf-8') as file:
return json.load(file)
def prefix_param_arg(first_json_conf, second_json_conf, first_method, second_method):
result_conf_params = dict()
result_conf_args = dict()
for key, value in first_json_conf["parameters"].items():
result_conf_params[first_method + "_" + key] = value
for key, value in first_json_conf["args"].items():
result_conf_args[first_method + "_" + key] = value
for key, value in second_json_conf["parameters"].items():
result_conf_params[second_method + "_" + key] = value
for key, value in second_json_conf["args"].items():
result_conf_args[second_method + "_" + key] = value
return result_conf_params, result_conf_args
def merge_cpocations(first_method_cplocations, second_method_cplocations, combination_strategy, margin):
if combination_strategy == 'union':
return sorted(set(first_method_cplocations + second_method_cplocations))
elif combination_strategy == 'intersection_strict':
return sorted(set(first_method_cplocations).intersection(set(second_method_cplocations)))
elif combination_strategy == 'intersection_first':
final_cplocations = list()
for cplocation in first_method_cplocations:
if any(abs(cplocation - compcploc) <= margin for compcploc in second_method_cplocations):
final_cplocations.append(cplocation)
return sorted(final_cplocations)
else:
sys.exit('Combination strategy unknown')
def create_fail_file(first_json, second_json, first_method, second_method):
result_json = dict()
result_json["error"] = "One of the configurations of the methods is failing"
# if (first_json["dataset"] == second_json["dataset"]) and (first_json["dataset_md5"] == second_json["dataset_md5"]):
if (first_json["dataset"] == second_json["dataset"]):
result_json["dataset"] = first_json["dataset"]
result_json["dataset_md5"] = second_json["dataset_md5"]
else:
sys.exit("Data inconsistency found. Check results folder")
result_json["hostname"] = first_json["hostname"]
result_json["status"] = "FAIL"
result_json["parameters"], result_json["args"] = prefix_param_arg(first_json, second_json, first_method, second_method)
result_json["result"] = dict()
result_json["result"]["cplocations"] = None
result_json["result"]["runtime"] = None
return result_json
def merge_files(first_json, second_json, first_method, second_method, combination_strategy, margin):
result_json = dict()
result_json["error"] = None
# if (first_json["dataset"] == second_json["dataset"]) and (first_json["dataset_md5"] == second_json["dataset_md5"])
if (first_json["dataset"] == second_json["dataset"]):
result_json["dataset"] = first_json["dataset"]
result_json["dataset_md5"] = second_json["dataset_md5"]
else:
sys.exit("Data inconsistency found. Check results folder")
result_json["hostname"] = first_json["hostname"]
result_json["status"] = "SUCCESS"
result_json["parameters"], result_json["args"] = prefix_param_arg(first_json, second_json, first_method, second_method)
result_json["result"] = dict()
result_json["result"]["cplocations"] = merge_cpocations(first_json["result"]["cplocations"], second_json["result"]["cplocations"], combination_strategy, margin)
result_json["result"]["runtime"] = first_json["result"]["runtime"] + second_json["result"]["runtime"]
return result_json
def store_json(output_directory, sig_path, result_json, first_second_conf):
if not os.path.exists(output_directory):
os.makedirs(output_directory)
signature_path = os.path.join(output_directory, sig_path)
if not os.path.exists(signature_path):
os.makedirs(signature_path)
signature_methods_path = os.path.join(signature_path, first_second_conf)
if not os.path.exists(signature_methods_path):
os.makedirs(signature_methods_path)
file_path = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15)) + ".json"
file_path = os.path.join(signature_methods_path, file_path)
with open(file_path, "w") as json_file:
json.dump(result_json, json_file, indent=4)
def process_signature(root, sig_path, first_method, second_method, output_directory, combination_strategy, margin, conf):
conf_folder_name = conf + '_' + first_method + '_' + second_method
first_method_conf_dir = os.path.join(root, sig_path, conf + '_' + first_method)
second_method_conf_dir = os.path.join(root, sig_path, conf + '_' + second_method)
for first_method_file in os.listdir(first_method_conf_dir):
first_method_file_path = os.path.join(first_method_conf_dir, first_method_file)
if os.path.isfile(first_method_file_path) and first_method_file_path.endswith('.json'):
first_method_json = load_json(first_method_file_path)
for second_method_file in os.listdir(second_method_conf_dir):
second_method_file_path = os.path.join(second_method_conf_dir, second_method_file)
if os.path.isfile(second_method_file_path) and second_method_file_path.endswith('.json'):
second_method_json = load_json(second_method_file_path)
if first_method_json['status'] == 'SUCCESS' and second_method_json['status'] == 'SUCCESS':
result_json = merge_files(first_method_json, second_method_json, first_method, second_method, combination_strategy, margin)
store_json(output_directory, sig_path, result_json, conf_folder_name)
elif (conf == "default") and not (first_method_json['status'] == 'SUCCESS' and second_method_json['status'] == 'SUCCESS'):
result_json = create_fail_file(first_method_json, second_method_json, first_method, second_method)
store_json(output_directory, sig_path, result_json, conf_folder_name)
def main():
args = parse_args()
root = args.input_directory
combination_strategy = args.combination_strategy
margin = int(args.margin)
for signature_path in os.listdir(root):
process_signature(root, signature_path, args.first_method, args.second_method, args.output_directory, combination_strategy, margin, 'best')
process_signature(root, signature_path, args.first_method, args.second_method, args.output_directory, combination_strategy, margin, 'default')
if __name__ == "__main__":
main()