scripts/jobtrace_to_yugong.py (70 lines of code) (raw):
import os
import re
from datetime import datetime, timedelta
import pandas as pd
from Yugong.Ownership import Ownership
from utility import human_readable_size
def generate_Yugong_weekly_traces():
csv_folder = "../jobTraces"
output_folder = "../yugongTraces"
os.makedirs(output_folder, exist_ok=True)
start_date = datetime.strptime("20241022", "%Y%m%d")
end_date = datetime.strptime("20250204", "%Y%m%d")
number_of_dates = (end_date - start_date).days + 1
ownership = Ownership(query_ownership_file_1=None, query_ownership_file_2=None, table_ownership_file=None)
for i in range(0, number_of_dates - 7, 7):
week_start = start_date + timedelta(days=i)
week_end = week_start + timedelta(days=6)
print(f"Processing {week_start:%Y%m%d} to {week_end:%Y%m%d}...")
weekly_dfs = []
for date in pd.date_range(week_start, week_end): # is inclusive
df_Presto = pd.read_csv(f"{csv_folder}/{date.strftime('%Y%m%d')}-Presto.csv", dtype={
'job_id': str, 'start_time': str, 'duration': float,
'cputime': float, 'db_name': str, 'table_name': str,
'uown_names': str, 'inputDataSize': float,
'outputDataSize': float, 'template_id': str
}, na_values=['\\N'])
df_Presto = df_Presto[['db_name', 'table_name', 'inputDataSize', 'outputDataSize',
'cputime', 'uown_names']]
test_df = df_Presto[df_Presto['uown_names'].isna()]
if not test_df.empty:
print(date.strftime('%Y%m%d'),"NaN inputDataSize", human_readable_size(test_df['inputDataSize'].sum()))
df_Spark = pd.read_csv(f"{csv_folder}/{date.strftime('%Y%m%d')}-Spark.csv", dtype={
'job_id': str, 'start_time': str, 'duration': float,
'cputime': float, 'db_name': str, 'table_name': str,
'uown_names': str, 'inputDataSize': float,
'outputDataSize': float, 'template_id': str
}, na_values=['\\N'])
total_cputime = df_Spark.groupby("job_id")["cputime"].first().sum()
#print(f"[{date.strftime('%Y%m%d')}] Total cputime of Spark jobs: {total_cputime}")
abFP_counts = df_Spark['job_id'].value_counts()
df_Spark["cputime"] /= df_Spark["job_id"].map(abFP_counts)
df_Spark = df_Spark[['db_name', 'table_name', 'inputDataSize', 'outputDataSize',
'cputime', 'uown_names']]
#print(f"should == Total cputime of Spark jobs after normalization: {df_Spark['cputime'].sum()}")
test_df = df_Spark[df_Spark['uown_names'].isna()]
if not test_df.empty:
print(date.strftime('%Y%m%d'),"NaN inputDataSize", human_readable_size(test_df['inputDataSize'].sum()))
weekly_dfs.extend([df_Spark, df_Presto])
merged_df = pd.concat(weekly_dfs, ignore_index=True)
# Handle NaN values
merged_df['uown_names'] = merged_df['uown_names'].fillna("")
merged_df = merged_df.groupby(["uown_names", "db_name", "table_name"]).agg({
"inputDataSize": "sum", "outputDataSize": "sum", "cputime": "sum"
}).reset_index()
print(f"# of uown_names (before): {merged_df['uown_names'].nunique()}")
for uown_names in merged_df['uown_names'].unique():
if uown_names is None:
continue
ownership.add_query_ownership(uown_names, uown_names)
merged_df['uown_names'] = merged_df['uown_names'].apply(ownership.get_query_ownership)
merged_df = merged_df.groupby(["uown_names", "db_name", "table_name"]).agg({
"inputDataSize": "sum", "outputDataSize": "sum", "cputime": "sum"
}).reset_index()
print(f"inputDataSize: {human_readable_size(merged_df['inputDataSize'].sum())}, "
f"outputDataSize: {human_readable_size(merged_df['outputDataSize'].sum())}, "
f"cputime: {merged_df['cputime'].sum()}")
print(f"# of uown_names (after): {merged_df['uown_names'].nunique()}")
merged_df.rename(columns={"uown_names": "abstractFingerPrint"}, inplace=True)
output_path = f"{output_folder}/report-uown-volume-table-{week_start:%Y%m%d}-{week_end:%Y%m%d}.csv"
merged_df.to_csv(output_path, index=False)
print(f"Generated {output_path}.")
if __name__ == "__main__":
generate_Yugong_weekly_traces()