in prediction_postprocessing_scripts/dynamic_time_warping_analysis.py [0:0]
def main():
args = parse_args()
print("[INFO] Loading time series metadata...")
with open(args.characteristics) as f:
data = json.load(f)
df_characteristics = pd.DataFrame.from_dict(data, orient="index").reset_index()
df_characteristics.rename(columns={"index": "signature_id", "product": "application"}, inplace=True)
df_characteristics["signature_id"] = pd.to_numeric(df_characteristics["signature_id"], errors="coerce").astype("Int64")
platforms = df_characteristics['platform'].unique()
segmented = {category: [] for category in ['Windows', 'macOS', 'Linux', 'Android', 'Others']}
for platform in platforms:
category = categorize_platform(platform)
segmented[category].append(platform)
df_characteristics['platform_category'] = df_characteristics['platform'].apply(categorize_platform)
# Load manual k values if provided
manual_k = {}
if args.manual_k:
with open(args.manual_k) as f:
manual_k = json.load(f)
print("[INFO] Loading available time series...")
time_series_dict = load_time_series(args.input_folder)
os.makedirs(args.output_folder, exist_ok=True)
silhouette_scores, inertia_scores = [], []
stats = dict()
# Check if the signature IDs file is provided
if args.signature_ids_file:
print("[INFO] Loading signature IDs from file...")
signature_ids_to_process = load_signature_ids(args.signature_ids_file)
else:
print("[INFO] Processing all available time series...")
signature_ids_to_process = list(time_series_dict.keys())
for (repo, application), group in df_characteristics.groupby(["repository", "application"]):
signature_ids = group["signature_id"].tolist()
available_ids = [sig_id for sig_id in signature_ids if sig_id in signature_ids_to_process]
time_series = [time_series_dict[sig_id] for sig_id in available_ids]
if not time_series:
print(f"[INFO] Skipping ({repo}, {application}) - No available time series.")
continue
else:
print(f"[INFO] Processing ({repo}, {application}) - {len(time_series)} available time series.")
distance_matrix = compute_dtw_matrix(time_series)
# Pass manual_k if it's provided; otherwise, it will default to None
optimal_clusters, scores, inertia = best_silhouette_score(distance_matrix, args.max_clusters, manual_k=args.manual_k)
# Ensure that scores are not empty before proceeding
if not scores:
print(f"[WARNING] Skipping clustering for ({repo}, {application}) - No valid silhouette score.")
continue
# Collect statistics for each repository-application pair
stats[f"{repo}_{application}"] = {
"optimal_clusters": optimal_clusters,
"silhouette_score": {
"min": np.min(scores) if scores else None,
"max": np.max(scores) if scores else None,
"mean": np.mean(scores) if scores else None,
"median": np.median(scores) if scores else None
},
"inertia": {
"min": np.min(inertia) if inertia else None,
"max": np.max(inertia) if inertia else None,
"mean": np.mean(inertia) if inertia else None,
"median": np.median(inertia) if inertia else None
}
}
# Proceed with clustering using the best number of clusters
optimal_clusters = np.argmax(scores) + 2 if not manual_k else manual_k[f"{repo}_{application}"]
clustering = AgglomerativeClustering(n_clusters=optimal_clusters, metric='precomputed', linkage='average')
labels = clustering.fit_predict(distance_matrix)
# Save the cluster assignments to CSV
save_cluster_assignments(f"{repo}_{application}_{len(time_series)}", available_ids, labels, args.output_folder)
# Generate and save the two plots (Silhouette and Elbow)
plot_silhouette_score(scores, args.output_folder, f"{repo}_{application}_{len(time_series)}")
plot_elbow_method(inertia, args.output_folder, f"{repo}_{application}_{len(time_series)}")
# Save statistics to JSON
save_statistics(stats, args.output_folder)