in tests.py [0:0]
def prepare_df(start_date: datetime, end_date: datetime, Presto=True, Spark=True) -> Tuple[pd.DataFrame, str]:
if end_date - start_date != timedelta(days=6):
raise ValueError("The date range must be exactly 7 days, check the input")
if not Presto and not Spark:
raise ValueError("At least one of Presto and Spark must be True to have data")
if end_date <= datetime.strptime("2024-05-09", "%Y-%m-%d"):
assert Presto and not Spark, "Only Presto is available before 2024-05-09"
job_data_access_df = pd.read_csv(os.path.join("oldTraces",
f"report-abFP-volume-table-{start_date.strftime('%m%d')}-{end_date.strftime('%m%d')}-all.csv"),
dtype = {'abstractFingerPrint': str,
'db_name': str,
'table_name': str,
'inputDataSize': float,
'cputime': str
})
job_data_access_df['db_name'] = job_data_access_df['db_name'].astype(str)
job_data_access_df['table_name'] = job_data_access_df['table_name'].astype(str)
job_data_access_df['cputime'] = job_data_access_df['cputime'].apply(to_seconds)
job_data_access_df['outputDataSize'] = 0
workload_print_info = f"{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')} Presto jobs"
else:
workload_print_info = f"{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')}"
if Presto:
presto_job = pd.read_csv(os.path.join("newTraces", f"report-abFP-volume-table-{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')}-Presto.csv"),
dtype={'abstractFingerPrint': str,
'db_name': str,
'table_name': str,
'inputDataSize': float,
'outputDataSize': float,
'cputime': float
})
presto_job['db_name'] = presto_job['db_name'].astype(str)
presto_job['table_name'] = presto_job['table_name'].astype(str)
workload_print_info += " Presto"
else:
presto_job = pd.DataFrame()
if Spark:
spark_job = pd.read_csv(os.path.join("newTraces", f"report-abFP-volume-table-{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')}-Spark.csv"),
dtype={'abstractFingerPrint': str,
'db_name': str,
'table_name': str,
'inputDataSize': float,
'outputDataSize': float,
'cputime': float
})
spark_job['db_name'] = spark_job['db_name'].astype(str)
spark_job['table_name'] = spark_job['table_name'].astype(str)
total_cputime = spark_job.groupby("abstractFingerPrint")["cputime"].first().sum()
print(f"Total cputime of Spark jobs: {total_cputime}")
abFP_counts = spark_job['abstractFingerPrint'].value_counts()
spark_job["cputime"] /= spark_job["abstractFingerPrint"].map(abFP_counts)
print(f"should == Total cputime of Spark jobs after normalization: {spark_job['cputime'].sum()}")
#assert spark_job['cputime'].sum() // 1000 == total_cputime // 1000, "Normalization error"
workload_print_info += " Spark"
else:
spark_job = pd.DataFrame()
job_data_access_df = pd.concat([presto_job, spark_job], ignore_index=True)
workload_print_info += " jobs"
return job_data_access_df, workload_print_info