import os
from datetime import datetime, timedelta

import pandas as pd


def generate_Yugong_weekly_traces():
    csv_folder = "../jobTraces"
    output_folder = "../yugongTraces"
    os.makedirs(output_folder, exist_ok=True)
    start_date = datetime.strptime("20241022", "%Y%m%d")
    end_date = datetime.strptime("20250127", "%Y%m%d")
    number_of_dates = (end_date - start_date).days + 1

    for i in range(0, number_of_dates - 7, 7):
        week_start = start_date + timedelta(days=i)
        week_end = week_start + timedelta(days=6)
        print(f"Processing {week_start:%Y%m%d} to {week_end:%Y%m%d}...")

        weekly_dfs = []
        for date in pd.date_range(week_start, week_end): # is inclusive
            df_Presto = pd.read_csv(f"{csv_folder}/{date.strftime('%Y%m%d')}-Presto.csv", dtype={
                'job_id': str, 'start_time': str, 'duration': float,
                'cputime': float, 'db_name': str, 'table_name': str,
                'uown_names': str, 'inputDataSize': float,
                'outputDataSize': float, 'template_id': str
            }, na_values=['\\N'])
            df_Presto = df_Presto[['db_name', 'table_name', 'inputDataSize', 'outputDataSize',
                                   'cputime', 'uown_names']]

            test_df = df_Presto[df_Presto['uown_names'].isna()]

            df_Spark = pd.read_csv(f"{csv_folder}/{date.strftime('%Y%m%d')}-Spark.csv", dtype={
                'job_id': str, 'start_time': str, 'duration': float,
                'cputime': float, 'db_name': str, 'table_name': str,
                'uown_names': str, 'inputDataSize': float,
                'outputDataSize': float, 'template_id': str
            }, na_values=['\\N'])

            abFP_counts = df_Spark['job_id'].value_counts()
            df_Spark["cputime"] /= df_Spark["job_id"].map(abFP_counts)
            df_Spark = df_Spark[['db_name', 'table_name', 'inputDataSize', 'outputDataSize',
                                 'cputime', 'uown_names']]

            weekly_dfs.extend([df_Spark, df_Presto])

        merged_df = pd.concat(weekly_dfs, ignore_index=True)

        merged_df = merged_df.groupby(["uown_names", "db_name", "table_name"]).agg({
            "inputDataSize": "sum", "outputDataSize": "sum", "cputime": "sum"
        }).reset_index()

        merged_df.rename(columns={"uown_names": "abstractFingerPrint"}, inplace=True)
        output_path = f"{output_folder}/report-uown-volume-table-{week_start:%Y%m%d}-{week_end:%Y%m%d}.csv"
        merged_df.to_csv(output_path, index=False)
        print(f"Generated {output_path}.")

if __name__ == "__main__":
    generate_Yugong_weekly_traces()