prediction_generation/showcase_diagram.py (122 lines of code) (raw):
import os
import argparse
import pandas as pd
import json
import sys
import matplotlib.pyplot as plt
summaries_folder_path = "showcase-diagram-results/JSON3"
csvs_folder_path = "showcase-diagram-results/CSV3"
datasets_metrics = []
for filename in os.listdir(summaries_folder_path):
if filename.endswith('.json'):
file_path = os.path.join(summaries_folder_path, filename)
with open(file_path, 'r') as file:
data = json.load(file)
datasets_metrics.append(data)
def process_best(method, metric):
hyperparams = dict()
for dataset_metrics in datasets_metrics:
for unit_method in dataset_metrics["results"]["best_" + method]:
if unit_method["status"] == "SUCCESS":
conf = unit_method["args"]
metrics = unit_method["scores"]
metric_value = metrics[metric]
conf_str = json.dumps(conf, sort_keys=True)
if conf_str in hyperparams:
hyperparams[conf_str].append(metric_value)
else:
hyperparams[conf_str] = [metric_value]
dict_metric = {key: sum(value) / len(value) for key, value in hyperparams.items()}
best_conf = max(dict_metric, key=dict_metric.get)
best_conf_score = dict_metric[best_conf]
return best_conf, best_conf_score
def process_oracle(data, method, metric):
elements = data["results"]["best_" + method]
elements_dict = {json.dumps(element["args"], sort_keys=True): element["scores"][metric] for element in elements if "args" in element and element.get("scores") is not None}
oracle_conf = max(elements_dict, key=elements_dict.get)
oracle_conf_score = elements_dict[oracle_conf]
return oracle_conf, oracle_conf_score
def process_default(data, method, metric):
default_conf = data["results"]["default_" + method][0]["args"]
default_conf_score = data["results"]["default_" + method][0]["scores"][metric]
return default_conf, default_conf_score
def parse_args():
parser = argparse.ArgumentParser(description="Generator of timeseries graphs")
parser.add_argument("-m", "--method", help="CPD method", required=True)
parser.add_argument("-t", "--timeseriessignature", help="timeseries signature", required=True)
parser.add_argument("-e", "--evaluationmode", help="Results related to evaluation mode", choices=['default', 'best', 'oracle'], required=True)
parser.add_argument("-s", "--scoremetric", help="Metric on which to base evaluation in case of Best or Oracle modes", choices=['precision', 'recall', 'f1'], required= False)
return parser.parse_args()
def fetch_data(signature_id, method, mode, metric):
df = pd.read_csv(csvs_folder_path + "/" + signature_id + "_timeseries_data.csv")
with open(summaries_folder_path + "/summary_" + signature_id + ".json", "r") as fp:
try:
s = fp.read()
s = s[s.find('{'): s.rfind('}') + 1]
data = json.loads(s)
except json.decoder.JSONDecodeError:
sys.exit("Error parsing json file: %s" % signature_id, file=sys.stderr)
if (mode == "best"):
conf, conf_score = process_best(method, metric)
for elem in data["results"]["best_" + method]:
if json.dumps(elem["args"], sort_keys=True) == conf:
conf_cplocations = elem["cplocations"]
elif(mode == "oracle"):
conf, conf_score = process_oracle(data, method, metric)
for elem in data["results"]["best_" + method]:
print()
if json.dumps(elem["args"], sort_keys=True) == conf:
conf_cplocations = elem["cplocations"]
elif(mode == "default"):
conf, conf_score = process_default(data, method, metric)
conf_cplocations = data["results"]["default_" + method][0]["cplocations"]
else:
sys.exit("Evaluation method does not exist")
return df, conf, conf_score, conf_cplocations
def display_timeseries(sample_df, sig_id, cplocations=None):
# Ensure 'push_timestamp' is treated as datetime
sample_df['push_timestamp'] = pd.to_datetime(sample_df['push_timestamp'])
sample_df.set_index('push_timestamp', inplace=True)
plt.figure(figsize=(20, 10))
color_mapping = {
'TP': 'green',
'FP': 'red',
'SP': 'grey',
'acknowledged': 'green'
}
# Plot the timeseries data
for idx, row in sample_df.iterrows():
plt.plot(idx, row['value'], marker='o', markersize=8, color=color_mapping.get(row['test_status_general'], 'blue'), alpha=0.6)
if row['test_status_general'] in ['TP', 'FP', 'SP', 'acknowledged']:
plt.axvline(x=idx, color=color_mapping.get(row['test_status_general']), linestyle='--', alpha=0.6)
# Add yellow lines for changepoints if they exist
if cplocations is not None:
for cp in cplocations:
# Convert integer indices in cplocations to corresponding timestamps
if isinstance(cp, int):
cp_timestamp = sample_df.index[cp] # Get the timestamp from the index
else:
cp_timestamp = pd.to_datetime(cp) # In case cp is already a timestamp
plt.axvline(x=cp_timestamp, color='yellow', linestyle='--', alpha=0.9) # Removed label
plt.title('Time Series Plot')
plt.xlabel('Date')
plt.ylabel(f'Measurement values associated with signature ID {sig_id}')
plt.grid(axis='y')
# Define y-axis limits
plt.xlim(sample_df.index.min(), sample_df.index.max())
y_min = 0
y_max = sample_df['value'].max() * 2
plt.ylim(bottom=y_min, top=y_max)
# Set weekly ticks for x-axis
start_date = sample_df.index.min()
end_date = sample_df.index.max()
weekly_ticks = pd.date_range(start=start_date, end=end_date, freq='W-MON')
plt.xticks(weekly_ticks, rotation=45)
plt.legend(loc='upper right') # The legend will no longer include 'Change Point'
plt.show()
def main():
args = parse_args()
method = args.method
signature = args.timeseriessignature
metric = args.scoremetric
mode = args.evaluationmode
df, conf, conf_score, conf_cplocations = fetch_data(str(signature), method, mode, metric)
print(conf)
print(conf_score)
display_timeseries(df, signature, conf_cplocations)
if __name__ == "__main__":
main()