import os
from datetime import datetime, timedelta

import pandas as pd


def generate_weekly_traces(engine_type="Spark"):
    assert engine_type in ["Spark", "Presto"], "engine_type must be either 'Spark' or 'Presto'"
    csv_folder = "../jobTraces"
    output_folder = "../newTraces"
    os.makedirs(output_folder, exist_ok=True)
    start_date = datetime.strptime("20241022", "%Y%m%d")
    end_date = datetime.strptime("20250127", "%Y%m%d")
    number_of_dates = (end_date - start_date).days + 1

    for i in range(0, number_of_dates - 7, 7):
        week_start = start_date + timedelta(days=i)
        week_end = week_start + timedelta(days=6)
        print(f"Processing {week_start:%Y%m%d} to {week_end:%Y%m%d}...")

        weekly_dfs = []
        for date in pd.date_range(week_start, week_end): # is inclusive
            try:
                df = pd.read_csv(f"{csv_folder}/{date.strftime('%Y%m%d')}-{engine_type}.csv")
                df = df[['job_id', 'template_id', 'db_name', 'table_name', 'inputDataSize', 'outputDataSize',
                                       'cputime']]
            except FileNotFoundError:
                print(f"File not found: {date.strftime('%Y%m%d')}-{engine_type}.csv.")
                break

            df['template_id'] = df['template_id'].astype('string')
            df['db_name'] = df['db_name'].astype('string')
            df['table_name'] = df['table_name'].astype('string')
            df['job_id'] = df['job_id'].astype('string')

            if df['job_id'].isnull().any():
                print(f"Warning: Missing job_id values found. {df['job_id'].isnull().sum()}"
                      f" rows will be dropped.")
            df = df.dropna(subset=["job_id"])

            bad = df['template_id'].isin([-1, "-1"]).sum()
            if bad:
                print(f"[warn] {bad} rows have template_id = -1 (unknown) on {date:%Y-%m-%d}, dropped")
            df = df[df["template_id"] != -1]

            if engine_type == "Spark":
                abFP_counts = df['job_id'].value_counts()
                df["cputime"] /= df["job_id"].map(abFP_counts)

            weekly_dfs.extend([df])

        merged_df = pd.concat(weekly_dfs, ignore_index=True)

        merged_df = merged_df.groupby(["template_id", "db_name", "table_name"]).agg({
            "inputDataSize": "sum", "outputDataSize": "sum", "cputime": "sum"
        }).reset_index()

        merged_df.rename(columns={"template_id": "abstractFingerPrint"}, inplace=True)
        output_path = f"{output_folder}/report-abFP-volume-table-{week_start:%Y%m%d}-{week_end:%Y%m%d}-{engine_type}.csv"
        merged_df.to_csv(output_path, index=False)
        print(f"Generated {output_path}.")

if __name__ == "__main__":
    generate_weekly_traces(engine_type="Spark")
    generate_weekly_traces(engine_type="Presto")