in utility.py [0:0]
def print(self):
def get_workload_info_for_slice(slice_df, col_db='hive_database_name', col_table='hive_table_name',
col_size='dir_size'):
# Create a copy of the slice
slice_df_copy = slice_df.copy()
# Check if we need to split the db_table column
if col_db == 'db_table' and col_table == 'db_table':
slice_df_copy[['db_name', 'table_name']] = slice_df_copy['db_table'].str.split('.', expand=True)
col_db = 'db_name'
col_table = 'table_name'
# Merge with workload_df on table names
merged_df = slice_df_copy.merge(
self.workload_df,
left_on=[col_db, col_table],
right_on=['db_name', 'table_name'],
how='left'
)
# read_size = merged_df['inputDataSize'].sum()
# total_cputime = merged_df['cputime'].sum()
cold_tables = merged_df[pd.isna(merged_df['inputDataSize'])]
cold_table_count = len(cold_tables)
cold_table_size = cold_tables[col_size].sum()
test_df = slice_df_copy.merge(
self.workload_df,
left_on=[col_db, col_table],
right_on=['db_name', 'table_name'],
how='right'
)
test_df = test_df[pd.notna(test_df[col_size])]
read_size = test_df['inputDataSize'].sum()
total_cputime = test_df['cputime'].sum()
return read_size, cold_table_count, cold_table_size, total_cputime
# 1. Print total count and size of all tables
all_tables_count = len(self.all_table_df)
all_tables_size = self.all_table_df['dir_size'].sum()
_, all_tables_cold_count, all_tables_cold_size, _ = (
get_workload_info_for_slice(self.all_table_df))
print(f"Total tables: {all_tables_count} | Total size: {human_readable_size(all_tables_size * 1024 ** 3)} | "
f"Read Size: {human_readable_size(self.total_inputDataSize * 1024 ** 3)} | "
f"Total CPU Time: {self.total_cputime} | "
f"Cold Tables: {all_tables_cold_count} | Cold Tables Size: {human_readable_size(all_tables_cold_size * 1024 ** 3)}")
# 2. Print count and total size of whale source tables
top300_count = len(self.whale_df)
top300_size = self.whale_df['size_in_gbs'].sum()
top300_read_size, top300_cold_count, top300_cold_size, top300_cputime = (
get_workload_info_for_slice(
self.whale_df, col_db='db_table', col_table='db_table', col_size='size_in_gbs'
))
print(f"Whale tables: {top300_count} | Total size: {human_readable_size(top300_size * 1024 ** 3)} | "
f"Read Size: {human_readable_size(top300_read_size * 1024 ** 3)} | "
f"Total CPU Time: {top300_cputime} | "
f"Cold Tables: {top300_cold_count} | Cold Tables Size: {human_readable_size(top300_cold_size * 1024 ** 3)}")
# 3. Print difference in count & size
diff_count = all_tables_count - top300_count
diff_size = all_tables_size - top300_size
diff_read_size = self.total_inputDataSize - top300_read_size
diff_cold_count = all_tables_cold_count - top300_cold_count
diff_cold_size = all_tables_cold_size - top300_cold_size
diff_cpu_time = self.total_cputime - top300_cputime
print(f"Other tables: {diff_count} | Total size: {human_readable_size(diff_size * 1024 ** 3)} | "
f"Read Size: {human_readable_size(diff_read_size * 1024 ** 3)} | "
f"Total CPU Time: {diff_cpu_time} | "
f"Cold Tables: {diff_cold_count} | Cold Tables Size: {human_readable_size(diff_cold_size * 1024 ** 3)}")
# 4. Detailed analysis of whale tables
sorted_whale_df = self.whale_df.sort_values(by='size_in_gbs', ascending=False)
# Loop through the table ranges and print info
ranks = [(0, 100), (100, 200), (200, 300)]
for start, end in ranks:
self.print_table_info(sorted_whale_df.iloc[start:end], start, end)
# pour out the code!=10 tables with db_table,size_in_gbs columns
sorted_whale_df[sorted_whale_df['code'] != 10][['db_table', 'size_in_gbs']].to_csv('whale_tables.csv',
index=False)