utility.py (227 lines of code) (raw):

import re import pandas as pd import random def to_seconds(interval_str: str): # Split the string into day and time components day_part, time_part = interval_str.split(' ') # Extract the day, and convert it to seconds (1 day = 86400 seconds) day_in_seconds = int(day_part) * 86400 # 24*60*60 # Split the time string into hours, minutes, seconds, and milliseconds time_part = time_part.split('.')[0] # omitting milliseconds for simplicity hours, minutes, seconds = map(int, time_part.split(':')) # hours, minutes, seconds, milliseconds = map(int, time_part.split(':')) # Convert time to seconds time_in_seconds = hours * 3600 + minutes * 60 + seconds # we are omitting milliseconds for simplicity return day_in_seconds + time_in_seconds def to_interval(seconds): days = int(seconds // 86400) seconds = int(seconds % 86400) hours = int(seconds // 3600) seconds = int(seconds % 3600) minutes = int(seconds // 60) seconds = int(seconds % 60) return f"{days:02d} {hours:02d}:{minutes:02d}:{seconds:02d}.000" def get_binary_outcome(p): """ Returns 1 with probability p and 0 with probability (1-p). Parameters: - p (float): Probability of returning 1. Must be between 0 and 1. Returns: - int: 1 or 0. """ if not 0 <= p <= 1: raise ValueError("p must be between 0 and 1") return 1 if random.random() < p else 0 def parse_size(size_str): size_str = size_str.strip() units = {"B": 1, "KB": 1024, "MB": 1024 ** 2, "GB": 1024 ** 3, "TB": 1024 ** 4, "PB": 1024 ** 5} size, unit = re.match(r"([\d.]+)\s*(\w+)", size_str).groups() return float(size) * units[unit] def egress_cost_calculator(egress_bytes, duration_in_months): # 0.02$ per GiB price_per_gib = 0.02 return egress_bytes * price_per_gib / 1024 ** 3 """ egress to Internet def egress_cost_calculator(egress_bytes, time_in_months, tier="Premium"): # Pricing tiers in TiB and corresponding cost per GiB in USD pricing_tiers = [ (1, 0.12), # 0-1 TiB (10, 0.11), # 1-10 TiB (float('inf'), 0.08) # 10+ TiB ] # Convert bytes to TiB egress_tib = egress_bytes / (1024 ** 4) # Adjust for the time period egress_tib_per_month = egress_tib / time_in_months total_cost = 0 remaining_tib = egress_tib_per_month for limit, cost_per_gib in pricing_tiers: if remaining_tib <= 0: break if remaining_tib > limit: cost_tib = limit else: cost_tib = remaining_tib total_cost += cost_tib * 1024 * cost_per_gib remaining_tib -= cost_tib return total_cost * time_in_months """ # # Example usage: # egress_bytes = 5 * 1024 ** 4 # 5 TiB # time_in_months = 0.25 # 1/4 month # # cost = egress_cost_calculator(egress_bytes, time_in_months) # print(f"The total egress cost is: ${cost:.2f}") def human_readable_size(size_in_bytes, decimal_places=2): if size_in_bytes < 0: return "-" + human_readable_size(-size_in_bytes) if size_in_bytes < 1024: return f"{size_in_bytes}B" elif size_in_bytes < 1024 ** 2: return f"{size_in_bytes / 1024:.{decimal_places}f}KB" elif size_in_bytes < 1024 ** 3: return f"{size_in_bytes / (1024 ** 2):.{decimal_places}f}MB" elif size_in_bytes < 1024 ** 4: return f"{size_in_bytes / (1024 ** 3):.{decimal_places}f}GB" elif size_in_bytes < 1024 ** 5: return f"{size_in_bytes / (1024 ** 4):.{decimal_places}f}TB" else: return f"{size_in_bytes / (1024 ** 5):.{decimal_places}f}PB" # not useful now class Cache(): def __init__(self, maxsize: int, kv_store): self.kv_store = kv_store # dict self.maxsize = maxsize self.cache = {} self.queue = [] self.hits = 0 self.misses = 0 def hit_rate(self): if self.hits + self.misses == 0: return 0 return self.hits / (self.hits + self.misses) def get(self, key, default=None): if key in self.cache: self.hits += 1 return self.cache[key] self.misses += 1 if key in self.kv_store: value = self.kv_store[key] self.put(key, value) return value return default def put(self, key, value): if len(self.cache) >= self.maxsize: evict_key = self.queue.pop(0) self.cache.pop(evict_key) self.cache[key] = value self.queue.append(key) """ Deprecated since too coarse-grained """ class InterNewFPWorkload: def __init__(self): self.df_list = [] assert False, "TODO" def abFP_to_table_group(self, input_path, output_path=None, persist=False): # Step 1: Read the CSV file input_df = pd.read_csv(input_path) # #Sample data for demonstration purposes # data = { # 'abstractFingerPrint': ['fp1', 'fp1', 'fp2', 'fp2', 'fp3'], # 'db_name': ['db1', 'db2', 'db1', 'db2', 'db1'], # 'table_name': ['table1', 'table2', 'table1', 'table2', 'table1'], # 'inputDataSize': [10, 20, 30, 40, 90], # 'cputime': [10, 10, 40, 40, 90] # Just adding some cputime values for completeness # 'count': [2, 2, 1, 1, 7] # } # input_df = pd.DataFrame(data) # Create a concatenated db.table column input_df['db_table'] = input_df['db_name'] + '.' + input_df['table_name'] # Create a key for unique db.table combinations for each abstractFingerPrint input_df['group_key'] = input_df.groupby('abstractFingerPrint')['db_table'].transform( lambda x: '.'.join(sorted(set(x)))) # Create a mapping for the group_key to a new abstractFingerPrint mapping = {k: f"newFP{i + 1}" for i, k in enumerate(input_df['group_key'].unique())} input_df['new_abstractFingerPrint'] = input_df['group_key'].map(mapping) # Now, aggregate inputDataSize and cputime agg_df = input_df.groupby(['new_abstractFingerPrint', 'db_name', 'table_name']).agg( # {'inputDataSize': 'sum', # 'cputime': 'sum'} inputDataSize=pd.NamedAgg(column='inputDataSize', aggfunc='sum'), cputime=pd.NamedAgg(column='cputime', aggfunc='sum'), count=pd.NamedAgg(column='unique_queryid_count', aggfunc='sum'), group_key=pd.NamedAgg(column='group_key', aggfunc='first') ).reset_index() # Drop the intermediary columns if needed # agg_df.drop(columns=['new_abstractFingerPrint'], inplace=True, errors='ignore') # Output the DataFrame # print(agg_df) # Step 6: Write to a new CSV file if persist: self.df_list.append(agg_df) if output_path: # TODO: cannot work with compare() # drop 'group_key' column agg_df.drop(columns=['group_key'], inplace=True, errors='ignore') agg_df.to_csv(output_path, index=False) def compare_table_groups(self): if not self.df_list: print("No data to compare") return group_key_sets = [set(self.df_list[i]['group_key'].unique()) for i in range(len(self.df_list))] last_set = group_key_sets[0] total_set = last_set common_set = last_set print(f"# table groups in df1 {len(last_set)}") for i in range(1, len(self.df_list)): group_key_set = group_key_sets[i] print(f"# table groups in df{i + 1} {len(group_key_set)}") new_set = group_key_set.difference(last_set) print(f"# table groups in df{i + 1} not in df{i} {len(new_set)}") common_set = common_set.intersection(group_key_set) print(f"# table groups in df1 to df{i + 1} {len(common_set)} in common") never_seen_set = group_key_set.difference(total_set) print(f"# table groups never seen before {len(never_seen_set)}") # how these tables contribute to the total cputime cputime = self.df_list[i]['cputime'].sum() cputime_new = self.df_list[i][self.df_list[i]['group_key'].isin(never_seen_set)]['cputime'].sum() print(f"Total cputime in df{i + 1} {cputime}, cputime of new table groups {cputime_new}, " f"ratio {cputime_new / cputime * 100:.2f}%") total_set = total_set.union(group_key_set) print(f"Total # table groups: {len(total_set)}") # test = InterNewFPWorkload() # test.abFP_to_table_group('report-abFP-volume-table-0818-0824.csv', # # 'report-newFP-volume-table-0818-0824.csv', # persist=True) # test.abFP_to_table_group('report-abFP-volume-table-0825-0831.csv', # # 'report-newFP-volume-table-0825-0831.csv', # persist=True) # test.abFP_to_table_group('report-abFP-volume-table-0901-0907.csv', # # 'report-newFP-volume-table-0901-0907.csv', # persist=True) # test.abFP_to_table_group('report-abFP-volume-table-0908-0914.csv', # # 'report-newFP-volume-table-0908-0914.csv', # persist=True) # test.abFP_to_table_group('report-abFP-volume-table-0915-0921.csv', # # 'report-newFP-volume-table-0915-0921.csv', # persist=True) # test.compare() """ Mostly to compare workloads in different time periods other settings are usually the same """ class Whale: # big and unpopular tables def __init__(self, whale_source_path, all_table_source_path, workload_source_path): # header: code,category,db_table,size_in_gbs self.whale_df = pd.read_csv(whale_source_path) # header: hive_database_name,hive_table_name,dir_size (Bytes) self.all_table_df = pd.read_csv(all_table_source_path) # into GB self.all_table_df['dir_size'] = self.all_table_df['dir_size'] / 1024 ** 3 # filter our size = 0 self.all_table_df = self.all_table_df[self.all_table_df['dir_size'] > 0] # header: db_name,table_name,inputDataSize (Bytes), cputime (Interval) self.workload_df = pd.read_csv(workload_source_path) self.workload_df['inputDataSize'] = self.workload_df['inputDataSize'] / 1024 ** 3 self.workload_df['cputime'] = self.workload_df['cputime'].apply(to_seconds) # verify the workload_df self.total_inputDataSize = self.workload_df['inputDataSize'].sum() self.total_cputime = self.workload_df['cputime'].sum() print("Total inputDataSize: ", self.total_inputDataSize) print("Total cputime: ", self.total_cputime) def print_table_info(self, df_slice, start_rank, end_rank): def get_workload_info_for_slice(slice_df): # Create a copy of the slice slice_df_copy = slice_df.copy() # Splitting the db_table column into two columns slice_df_copy[['split_db_name', 'split_table_name']] = slice_df_copy['db_table'].str.split('.', expand=True) # Merge with workload_df on table names merged_df = slice_df_copy.merge( self.workload_df, left_on=['split_db_name', 'split_table_name'], right_on=['db_name', 'table_name'], how='left' ) # read_size_ = merged_df['inputDataSize'].sum() # total_cputime = merged_df['cputime'].sum() cold_tables = merged_df[pd.isna(merged_df['inputDataSize'])] cold_table_count = len(cold_tables) cold_table_size = cold_tables['size_in_gbs'].sum() test_df = slice_df_copy.merge( self.workload_df, left_on=['split_db_name', 'split_table_name'], right_on=['db_name', 'table_name'], how='right' ) test_df = test_df[pd.notna(test_df['size_in_gbs'])] read_size_ = test_df['inputDataSize'].sum() total_cputime = test_df['cputime'].sum() return read_size_, cold_table_count, cold_table_size, total_cputime for code_val, desc in [(10, 'code=10'), (None, 'code!=10')]: curr_slice = df_slice[df_slice['code'] == code_val] if code_val is not None else df_slice[ df_slice['code'] != 10] read_size, cold_count, cold_size, total_cputime = get_workload_info_for_slice(curr_slice) print( f"{start_rank}~{end_rank} tables with {desc}: Count: {len(curr_slice)} " f"| Size: {human_readable_size(curr_slice['size_in_gbs'].sum() * 1024 ** 3)} " f"| Read Size: {human_readable_size(read_size * 1024 ** 3)} " f"| Total CPU Time: {total_cputime} " f"| Cold Tables: {cold_count} | Cold Tables Size: {human_readable_size(cold_size * 1024 ** 3)}" ) def print(self): def get_workload_info_for_slice(slice_df, col_db='hive_database_name', col_table='hive_table_name', col_size='dir_size'): # Create a copy of the slice slice_df_copy = slice_df.copy() # Check if we need to split the db_table column if col_db == 'db_table' and col_table == 'db_table': slice_df_copy[['db_name', 'table_name']] = slice_df_copy['db_table'].str.split('.', expand=True) col_db = 'db_name' col_table = 'table_name' # Merge with workload_df on table names merged_df = slice_df_copy.merge( self.workload_df, left_on=[col_db, col_table], right_on=['db_name', 'table_name'], how='left' ) # read_size = merged_df['inputDataSize'].sum() # total_cputime = merged_df['cputime'].sum() cold_tables = merged_df[pd.isna(merged_df['inputDataSize'])] cold_table_count = len(cold_tables) cold_table_size = cold_tables[col_size].sum() test_df = slice_df_copy.merge( self.workload_df, left_on=[col_db, col_table], right_on=['db_name', 'table_name'], how='right' ) test_df = test_df[pd.notna(test_df[col_size])] read_size = test_df['inputDataSize'].sum() total_cputime = test_df['cputime'].sum() return read_size, cold_table_count, cold_table_size, total_cputime # 1. Print total count and size of all tables all_tables_count = len(self.all_table_df) all_tables_size = self.all_table_df['dir_size'].sum() _, all_tables_cold_count, all_tables_cold_size, _ = ( get_workload_info_for_slice(self.all_table_df)) print(f"Total tables: {all_tables_count} | Total size: {human_readable_size(all_tables_size * 1024 ** 3)} | " f"Read Size: {human_readable_size(self.total_inputDataSize * 1024 ** 3)} | " f"Total CPU Time: {self.total_cputime} | " f"Cold Tables: {all_tables_cold_count} | Cold Tables Size: {human_readable_size(all_tables_cold_size * 1024 ** 3)}") # 2. Print count and total size of whale source tables top300_count = len(self.whale_df) top300_size = self.whale_df['size_in_gbs'].sum() top300_read_size, top300_cold_count, top300_cold_size, top300_cputime = ( get_workload_info_for_slice( self.whale_df, col_db='db_table', col_table='db_table', col_size='size_in_gbs' )) print(f"Whale tables: {top300_count} | Total size: {human_readable_size(top300_size * 1024 ** 3)} | " f"Read Size: {human_readable_size(top300_read_size * 1024 ** 3)} | " f"Total CPU Time: {top300_cputime} | " f"Cold Tables: {top300_cold_count} | Cold Tables Size: {human_readable_size(top300_cold_size * 1024 ** 3)}") # 3. Print difference in count & size diff_count = all_tables_count - top300_count diff_size = all_tables_size - top300_size diff_read_size = self.total_inputDataSize - top300_read_size diff_cold_count = all_tables_cold_count - top300_cold_count diff_cold_size = all_tables_cold_size - top300_cold_size diff_cpu_time = self.total_cputime - top300_cputime print(f"Other tables: {diff_count} | Total size: {human_readable_size(diff_size * 1024 ** 3)} | " f"Read Size: {human_readable_size(diff_read_size * 1024 ** 3)} | " f"Total CPU Time: {diff_cpu_time} | " f"Cold Tables: {diff_cold_count} | Cold Tables Size: {human_readable_size(diff_cold_size * 1024 ** 3)}") # 4. Detailed analysis of whale tables sorted_whale_df = self.whale_df.sort_values(by='size_in_gbs', ascending=False) # Loop through the table ranges and print info ranks = [(0, 100), (100, 200), (200, 300)] for start, end in ranks: self.print_table_info(sorted_whale_df.iloc[start:end], start, end) # pour out the code!=10 tables with db_table,size_in_gbs columns sorted_whale_df[sorted_whale_df['code'] != 10][['db_table', 'size_in_gbs']].to_csv('whale_tables.csv', index=False) # whale = Whale('whale_tables_source.csv', # 'report-table-size.csv', # 'report-abFP-volume-table-0908-0914.csv') # whale.print()