scripts/anonymize.py (180 lines of code) (raw):

import os.path import datetime from datetime import timedelta import pandas as pd class Anonymization: def __init__(self, dir_path): self.dir_path = dir_path self.abFP = {} if os.path.exists(os.path.join(self.dir_path, 'abFP.csv')): print("loading abFP") df = pd.read_csv(os.path.join(dir_path, 'abFP.csv')) for _, row in df.iterrows(): self.abFP[row['abstractFingerPrint']] = row['id'] self.db = {} if os.path.exists(os.path.join(self.dir_path, 'db.csv')): print("loading db") df = pd.read_csv(os.path.join(dir_path, 'db.csv')) for _, row in df.iterrows(): self.db[row['db_name']] = row['id'] self.table = {} if os.path.exists(os.path.join(self.dir_path, 'table.csv')): print("loading table") df = pd.read_csv(os.path.join(dir_path, 'table.csv')) for _, row in df.iterrows(): self.table[row['table_name']] = row['id'] def get_counters(self): print("existing", len(self.abFP), len(self.db), len(self.table)) return len(self.abFP), len(self.db), len(self.table) def log_map(self): # mapping to files with open(os.path.join(self.dir_path, 'abFP.csv'), 'w') as f1: # header f1.write('id,abstractFingerPrint\n') for key, value in self.abFP.items(): f1.write(f'{value},{key}\n') with open(os.path.join(self.dir_path, 'db.csv'), 'w') as f2: f2.write('id,db_name\n') for key, value in self.db.items(): f2.write(f'{value},{key}\n') with open(os.path.join(self.dir_path, 'table.csv'), 'w') as f3: f3.write('id,table_name\n') for key, value in self.table.items(): f3.write(f'{value},{key}\n') def anonymize_table(self, input_dir, file_name): print("processing", os.path.join(input_dir, file_name)) # header # hive_database_name,hive_table_name,dir_size,datestr,uown_names df = pd.read_csv(os.path.join(input_dir, file_name)) df = df[df['dir_size']>0] # drop dir_size=0 print("# of tables", len(df)) counter_d = len(self.db) counter_t = len(self.table) print("existing", counter_d, counter_t) # anonymization for index, row in df.iterrows(): d_string = row['hive_database_name'] t_string = row['hive_table_name'] if d_string not in self.db: self.db[d_string] = counter_d counter_d += 1 if t_string not in self.table: self.table[t_string] = counter_t counter_t += 1 print("mapping created") print("now", counter_d, counter_t) # Apply mappings to anonymize the data df['hive_database_name'] = df['hive_database_name'].map(self.db) df['hive_table_name'] = df['hive_table_name'].map(self.table) # if 'uown_names' column exists, anonymize it if 'uown_names' in df.columns: df['uown_names'] = df['uown_names'].apply(lambda x: False if pd.isna(x) or x.strip() == "" else True) df.to_csv(os.path.join(self.dir_path, 'anonymized_'+file_name), index=False) def anonymize_job_Spark(self, input_dir, file_name): print("processing", os.path.join(input_dir, file_name)) # header: job_id,start_time,end_time,cputime,db_name,table_name, # inputDataSize,outputDataSize,uown_names,MB_days_per_day,template_id, # date,read_op_count,write_op_count,db_table,total_io,duration df = pd.read_csv(os.path.join(input_dir, file_name)) df.drop(columns=['end_time', 'MB_days_per_day', 'read_op_count', 'write_op_count', 'total_io', 'db_table'], inplace=True) counter_a, counter_d, counter_t = self.get_counters() for index, row in df.iterrows(): a_string = row['template_id'] if pd.notna(row['template_id']) else None d_string = row['db_name'] t_string = row['table_name'] if a_string is not None and a_string not in self.abFP: self.abFP[a_string] = counter_a counter_a += 1 if d_string not in self.db: self.db[d_string] = counter_d counter_d += 1 if t_string not in self.table: self.table[t_string] = counter_t counter_t += 1 print("mapping updated into", counter_a, counter_d, counter_t) # Apply mappings to anonymize the data df['template_id'] = df['template_id'].apply(lambda x: self.abFP.get(x) if pd.notna(x) else -1) df['db_name'] = df['db_name'].map(self.db) df['table_name'] = df['table_name'].map(self.table) if 'uown_names' in df.columns: df['uown_names'] = df['uown_names'].apply(lambda x: False if pd.isna(x) or x.strip() == "" else True) # Generate a new CSV file with anonymized data anonymized_file_name = "anonymized_" + file_name df.to_csv(os.path.join(self.dir_path, anonymized_file_name), index=False) def anonymize_job_Presto(self, input_dir, file_name): print("processing", os.path.join(input_dir, file_name)) # header: starttimems,job_id,template_id,walltimems, # db_name,table_name,uown_names,date,inputDataSize,cputime,start_time,duration,outputDataSize df = pd.read_csv(os.path.join(input_dir, file_name)) df.drop(columns=['starttimems', 'walltimems'], inplace=True) counter_a, counter_d, counter_t = self.get_counters() # anonymization for index, row in df.iterrows(): a_string = row['template_id'] d_string = row['db_name'] t_string = row['table_name'] if a_string not in self.abFP: self.abFP[a_string] = counter_a counter_a += 1 if d_string not in self.db: self.db[d_string] = counter_d counter_d += 1 if t_string not in self.table: self.table[t_string] = counter_t counter_t += 1 print("mapping updated into", counter_a, counter_d, counter_t) # Apply mappings to anonymize the data df['template_id'] = df['template_id'].map(self.abFP) df['db_name'] = df['db_name'].map(self.db) df['table_name'] = df['table_name'].map(self.table) # if 'uown_names' column exists, anonymize it if 'uown_names' in df.columns: df['uown_names'] = df['uown_names'].apply(lambda x: False if pd.isna(x) or x.strip() == "" else True) # Generate a new CSV file with anonymized data anonymized_file_name = "anonymized_" + file_name df.to_csv(os.path.join(self.dir_path, anonymized_file_name), index=False) def anonymize_workload(self, input_dir, file_name): print("processing", os.path.join(input_dir, file_name)) # header # abstractFingerPrint, db_name, table_name, inputDataSize, cputime df = pd.read_csv(os.path.join(input_dir, file_name)) # adjust abstractfingerprint into abstractFingerPrint if 'abstractfingerprint' in df.columns: df.rename(columns={'abstractfingerprint': 'abstractFingerPrint'}, inplace=True) counter_a, counter_d, counter_t = self.get_counters() # anonymization for index, row in df.iterrows(): a_string = row['abstractFingerPrint'] d_string = row['db_name'] t_string = row['table_name'] if a_string not in self.abFP: self.abFP[a_string] = counter_a counter_a += 1 if d_string not in self.db: self.db[d_string] = counter_d counter_d += 1 if t_string not in self.table: self.table[t_string] = counter_t counter_t += 1 print("mapping created") print("now", counter_a, counter_d, counter_t) # Apply mappings to anonymize the data df['abstractFingerPrint'] = df['abstractFingerPrint'].map(self.abFP) df['db_name'] = df['db_name'].map(self.db) df['table_name'] = df['table_name'].map(self.table) # if 'uown_names' column exists, anonymize it if 'uown_names' in df.columns: df['uown_names'] = df['uown_names'].apply(lambda x: False if pd.isna(x) or x.strip() == "" else True) # Generate a new CSV file with anonymized data anonymized_file_name = "anonymized_" + file_name df.to_csv(os.path.join(self.dir_path, anonymized_file_name), index=False) def anonymize_placements(self, input_dir, dir_name): print("processing", os.path.join(input_dir, dir_name)) input_path = os.path.join(input_dir, dir_name) assert os.path.exists(input_path) output_path = os.path.join(f"../anonymize/{dir_name}") os.makedirs(output_path, exist_ok=True) # header: table,z,w,size df = pd.read_csv(os.path.join(input_path, "dataset_placement.csv")) df[['db_name', 'table_name']] = df['table'].str.split('.', n=1, expand=True) df['db_name'] = df['db_name'].map(self.db).astype(int) #df['table_name'] = df['table_name'].map(self.table).astype(int) # Anonymize table_name only if not 'group' def map_table_name(name): return name if name == 'group' else str(int(self.table[name])) df['table_name'] = df['table_name'].apply(map_table_name) df['table'] = df['db_name'].astype(str) + '.' + df['table_name'].astype(str) df.drop(columns=['db_name', 'table_name'], inplace=True) # df['table'] = df['table'].map(self.table) df.to_csv(os.path.join(output_path, "dataset_placement.csv"), index=False) # header: abFP,y df = pd.read_csv(os.path.join(input_path, "query_placement.csv")) df['abFP'] = df['abFP'].map(self.abFP) df.to_csv(os.path.join(output_path, "query_placement.csv"), index=False) if __name__ == '__main__': a = Anonymization('../anonymize') a.anonymize_table("../", 'report-table-size-20241021.csv') a.log_map() start_date = datetime.date(year=2024, month=10, day=22) for i in range(7): day = start_date + timedelta(days=i * 7) end_date = day + timedelta(days=6) a.anonymize_workload("../newTraces/", f'report-abFP-volume-table-{day.strftime("%Y%m%d")}-{end_date.strftime("%Y%m%d")}-Presto.csv') a.anonymize_workload("../newTraces/", f'report-abFP-volume-table-{day.strftime("%Y%m%d")}-{end_date.strftime("%Y%m%d")}-Spark.csv') a.log_map() day_ranges = pd.date_range(start='2024-10-22', end='2025-01-27').strftime('%Y%m%d').tolist() for day in day_ranges: a.anonymize_job_Presto("../jobTraces/", f'{day}-Presto.csv') a.anonymize_job_Spark("../jobTraces/", f'{day}-Spark.csv') a.log_map() """ Not needed for external usage since the inputs should be anonymized already """ # base_dir_path = "../sample_1.000_rep0.002" # a.anonymize_placements(base_dir_path, "test_run_c70_bw0.02_local30") # start_date = datetime.date(year=2024, month=10, day=29) # for i in range(6): # a.anonymize_placements(base_dir_path, f"test_run_c70_bw0.02_local30_{start_date.strftime('%m%d')}") # start_date += timedelta(days=7)