prediction_generation/old-code/kcpa_ecp.py (108 lines of code) (raw):
import argparse
import json
import time
import numpy as np
import socket
import hashlib
import ruptures as rpt
def kcpa(data, L, cost):
algo = rpt.KernelCPD(kernel="linear", min_size=3, cost=cost).fit(data)
breakpoints = algo.predict(n_bkps=L)
return breakpoints
algo = rpt.KernelCPD(kernel="linear").fit(data)
return algo.predict(n_bkps=L)
# Function to handle e.divisive Change Point Analysis
def e_divisive(data, alpha, minsize, siglvl, runs):
algo = rpt.Edivisive(alpha=alpha, min_size=minsize, significance_level=siglvl).fit(data)
return algo.predict(n_bkps=runs)
# Function to load dataset from a JSON file
def load_dataset(filepath):
with open(filepath, 'r') as f:
return json.load(f)
# Function to get the MD5 hash of a file
def get_md5(filepath):
with open(filepath, 'rb') as f:
file_hash = hashlib.md5()
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest()
# Argument parsing function
def parse_args():
parser = argparse.ArgumentParser(description="Run KCPA or e.divisive algorithms on a time series dataset.")
parser.add_argument('-i', '--input', required=True, help="Path to the input JSON dataset file.")
parser.add_argument('-o', '--output', help="Path to the output JSON file.")
parser.add_argument('-a', '--algorithm', choices=['e.divisive', 'kcpa'], required=True, help="Algorithm to run.")
parser.add_argument('--alpha', type=float, help="Alpha parameter for e.divisive.", default=1.0)
parser.add_argument('--minsize', type=int, help="Minimum segment size.", default=2)
parser.add_argument('-R', '--runs', type=int, help="Number of runs for the algorithm.", default=20)
parser.add_argument('--siglvl', type=float, help="Significance level for e.divisive.", default=0.05)
parser.add_argument('-C', '--cost', type=float, help="Cost parameter for KCPA.", default=1.0)
parser.add_argument('-L', '--maxcp', help="Maximum number of change points for KCPA (or set to 'max').", default='max')
return parser.parse_args()
# Main function to execute the selected algorithm
def main():
args = parse_args()
# Load dataset
data = load_dataset(args.input)
time_series = np.array(data["series"][0]["raw"]).reshape(-1, 1) # Convert to numpy array
dataset_md5 = get_md5(args.input)
# Set `maxcp` to a reasonable value if it's 'max'
if args.maxcp == 'max':
args.maxcp = min(len(time_series) // 2, 100) # Limit L to at most 100 or half the length of the series
start_time = time.time()
try:
# Run the appropriate algorithm
if args.algorithm == 'e.divisive':
result = e_divisive(time_series, alpha=args.alpha, runs=args.runs, minsize=args.minsize, siglvl=args.siglvl)
elif args.algorithm == 'kcpa':
result = kcpa(time_series, L=args.maxcp, cost=args.cost)
runtime = time.time() - start_time
# If result is None or empty, raise an error to handle it as a failure
if not result:
raise ValueError("No valid change points detected.")
# Construct success output
output_data = {
"error": None,
"command": f"python3.9 /TCPDBench/execs/python/kcpa_ecp.py -i {args.input} -a {args.algorithm} --alpha {args.alpha} --minsize {args.minsize} --runs {args.runs} --siglvl {args.siglvl}",
"script": "/TCPDBench/execs/python/kcpa_ecp.py",
"script_md5": get_md5('/TCPDBench/execs/python/kcpa_ecp.py'),
"hostname": socket.gethostname(),
"dataset": args.input.split('/')[-1].split('.')[0],
"dataset_md5": dataset_md5,
"status": "SUCCESS",
"parameters": {
"alpha": args.alpha,
"minsize": args.minsize,
"max_cp": args.maxcp,
"method": args.algorithm,
"runs": args.runs,
"siglvl": args.siglvl
},
"arguments": vars(args),
"result": {
"cplocations": list(map(int, result)),
"runtime": runtime
}
}
except Exception as e:
# Handle any errors and produce a failure output
runtime = time.time() - start_time
output_data = {
"error": str(e),
"command": f"python3.9 /TCPDBench/execs/python/kcpa_ecp.py -i {args.input} -a {args.algorithm} --alpha {args.alpha} --minsize {args.minsize} --runs {args.runs} --siglvl {args.siglvl}",
"script": "/TCPDBench/execs/python/kcpa_ecp.py",
"script_md5": get_md5('/TCPDBench/execs/python/kcpa_ecp.py'),
"hostname": socket.gethostname(),
"dataset": args.input.split('/')[-1].split('.')[0],
"dataset_md5": dataset_md5,
"status": "FAIL",
"parameters": {
"alpha": args.alpha,
"minsize": args.minsize,
"max_cp": args.maxcp,
"method": args.algorithm,
"runs": args.runs,
"siglvl": args.siglvl
},
"arguments": vars(args),
"result": {
"cplocations": None,
"runtime": None
}
}
# Write output to file or print to console
if args.output:
with open(args.output, 'w') as outfile:
json.dump(output_data, outfile, indent=4)
else:
print(json.dumps(output_data, indent=4))
if __name__ == "__main__":
main()