scripts/anonymize.py (180 lines of code) (raw):
import os.path
import datetime
from datetime import timedelta
import pandas as pd
class Anonymization:
def __init__(self, dir_path):
self.dir_path = dir_path
self.abFP = {}
if os.path.exists(os.path.join(self.dir_path, 'abFP.csv')):
print("loading abFP")
df = pd.read_csv(os.path.join(dir_path, 'abFP.csv'))
for _, row in df.iterrows():
self.abFP[row['abstractFingerPrint']] = row['id']
self.db = {}
if os.path.exists(os.path.join(self.dir_path, 'db.csv')):
print("loading db")
df = pd.read_csv(os.path.join(dir_path, 'db.csv'))
for _, row in df.iterrows():
self.db[row['db_name']] = row['id']
self.table = {}
if os.path.exists(os.path.join(self.dir_path, 'table.csv')):
print("loading table")
df = pd.read_csv(os.path.join(dir_path, 'table.csv'))
for _, row in df.iterrows():
self.table[row['table_name']] = row['id']
def get_counters(self):
print("existing", len(self.abFP), len(self.db), len(self.table))
return len(self.abFP), len(self.db), len(self.table)
def log_map(self):
# mapping to files
with open(os.path.join(self.dir_path, 'abFP.csv'), 'w') as f1:
# header
f1.write('id,abstractFingerPrint\n')
for key, value in self.abFP.items():
f1.write(f'{value},{key}\n')
with open(os.path.join(self.dir_path, 'db.csv'), 'w') as f2:
f2.write('id,db_name\n')
for key, value in self.db.items():
f2.write(f'{value},{key}\n')
with open(os.path.join(self.dir_path, 'table.csv'), 'w') as f3:
f3.write('id,table_name\n')
for key, value in self.table.items():
f3.write(f'{value},{key}\n')
def anonymize_table(self, input_dir, file_name):
print("processing", os.path.join(input_dir, file_name))
# header
# hive_database_name,hive_table_name,dir_size,datestr,uown_names
df = pd.read_csv(os.path.join(input_dir, file_name))
df = df[df['dir_size']>0] # drop dir_size=0
print("# of tables", len(df))
counter_d = len(self.db)
counter_t = len(self.table)
print("existing", counter_d, counter_t)
# anonymization
for index, row in df.iterrows():
d_string = row['hive_database_name']
t_string = row['hive_table_name']
if d_string not in self.db:
self.db[d_string] = counter_d
counter_d += 1
if t_string not in self.table:
self.table[t_string] = counter_t
counter_t += 1
print("mapping created")
print("now", counter_d, counter_t)
# Apply mappings to anonymize the data
df['hive_database_name'] = df['hive_database_name'].map(self.db)
df['hive_table_name'] = df['hive_table_name'].map(self.table)
# if 'uown_names' column exists, anonymize it
if 'uown_names' in df.columns:
df['uown_names'] = df['uown_names'].apply(lambda x: False if pd.isna(x) or x.strip() == "" else True)
df.to_csv(os.path.join(self.dir_path, 'anonymized_'+file_name), index=False)
def anonymize_job_Spark(self, input_dir, file_name):
print("processing", os.path.join(input_dir, file_name))
# header: job_id,start_time,end_time,cputime,db_name,table_name,
# inputDataSize,outputDataSize,uown_names,MB_days_per_day,template_id,
# date,read_op_count,write_op_count,db_table,total_io,duration
df = pd.read_csv(os.path.join(input_dir, file_name))
df.drop(columns=['end_time', 'MB_days_per_day', 'read_op_count', 'write_op_count',
'total_io', 'db_table'], inplace=True)
counter_a, counter_d, counter_t = self.get_counters()
for index, row in df.iterrows():
a_string = row['template_id'] if pd.notna(row['template_id']) else None
d_string = row['db_name']
t_string = row['table_name']
if a_string is not None and a_string not in self.abFP:
self.abFP[a_string] = counter_a
counter_a += 1
if d_string not in self.db:
self.db[d_string] = counter_d
counter_d += 1
if t_string not in self.table:
self.table[t_string] = counter_t
counter_t += 1
print("mapping updated into", counter_a, counter_d, counter_t)
# Apply mappings to anonymize the data
df['template_id'] = df['template_id'].apply(lambda x: self.abFP.get(x) if pd.notna(x) else -1)
df['db_name'] = df['db_name'].map(self.db)
df['table_name'] = df['table_name'].map(self.table)
if 'uown_names' in df.columns:
df['uown_names'] = df['uown_names'].apply(lambda x: False if pd.isna(x) or x.strip() == "" else True)
# Generate a new CSV file with anonymized data
anonymized_file_name = "anonymized_" + file_name
df.to_csv(os.path.join(self.dir_path, anonymized_file_name), index=False)
def anonymize_job_Presto(self, input_dir, file_name):
print("processing", os.path.join(input_dir, file_name))
# header: starttimems,job_id,template_id,walltimems,
# db_name,table_name,uown_names,date,inputDataSize,cputime,start_time,duration,outputDataSize
df = pd.read_csv(os.path.join(input_dir, file_name))
df.drop(columns=['starttimems', 'walltimems'], inplace=True)
counter_a, counter_d, counter_t = self.get_counters()
# anonymization
for index, row in df.iterrows():
a_string = row['template_id']
d_string = row['db_name']
t_string = row['table_name']
if a_string not in self.abFP:
self.abFP[a_string] = counter_a
counter_a += 1
if d_string not in self.db:
self.db[d_string] = counter_d
counter_d += 1
if t_string not in self.table:
self.table[t_string] = counter_t
counter_t += 1
print("mapping updated into", counter_a, counter_d, counter_t)
# Apply mappings to anonymize the data
df['template_id'] = df['template_id'].map(self.abFP)
df['db_name'] = df['db_name'].map(self.db)
df['table_name'] = df['table_name'].map(self.table)
# if 'uown_names' column exists, anonymize it
if 'uown_names' in df.columns:
df['uown_names'] = df['uown_names'].apply(lambda x: False if pd.isna(x) or x.strip() == "" else True)
# Generate a new CSV file with anonymized data
anonymized_file_name = "anonymized_" + file_name
df.to_csv(os.path.join(self.dir_path, anonymized_file_name), index=False)
def anonymize_workload(self, input_dir, file_name):
print("processing", os.path.join(input_dir, file_name))
# header
# abstractFingerPrint, db_name, table_name, inputDataSize, cputime
df = pd.read_csv(os.path.join(input_dir, file_name))
# adjust abstractfingerprint into abstractFingerPrint
if 'abstractfingerprint' in df.columns:
df.rename(columns={'abstractfingerprint': 'abstractFingerPrint'}, inplace=True)
counter_a, counter_d, counter_t = self.get_counters()
# anonymization
for index, row in df.iterrows():
a_string = row['abstractFingerPrint']
d_string = row['db_name']
t_string = row['table_name']
if a_string not in self.abFP:
self.abFP[a_string] = counter_a
counter_a += 1
if d_string not in self.db:
self.db[d_string] = counter_d
counter_d += 1
if t_string not in self.table:
self.table[t_string] = counter_t
counter_t += 1
print("mapping created")
print("now", counter_a, counter_d, counter_t)
# Apply mappings to anonymize the data
df['abstractFingerPrint'] = df['abstractFingerPrint'].map(self.abFP)
df['db_name'] = df['db_name'].map(self.db)
df['table_name'] = df['table_name'].map(self.table)
# if 'uown_names' column exists, anonymize it
if 'uown_names' in df.columns:
df['uown_names'] = df['uown_names'].apply(lambda x: False if pd.isna(x) or x.strip() == "" else True)
# Generate a new CSV file with anonymized data
anonymized_file_name = "anonymized_" + file_name
df.to_csv(os.path.join(self.dir_path, anonymized_file_name), index=False)
def anonymize_placements(self, input_dir, dir_name):
print("processing", os.path.join(input_dir, dir_name))
input_path = os.path.join(input_dir, dir_name)
assert os.path.exists(input_path)
output_path = os.path.join(f"../anonymize/{dir_name}")
os.makedirs(output_path, exist_ok=True)
# header: table,z,w,size
df = pd.read_csv(os.path.join(input_path, "dataset_placement.csv"))
df[['db_name', 'table_name']] = df['table'].str.split('.', n=1, expand=True)
df['db_name'] = df['db_name'].map(self.db).astype(int)
#df['table_name'] = df['table_name'].map(self.table).astype(int)
# Anonymize table_name only if not 'group'
def map_table_name(name):
return name if name == 'group' else str(int(self.table[name]))
df['table_name'] = df['table_name'].apply(map_table_name)
df['table'] = df['db_name'].astype(str) + '.' + df['table_name'].astype(str)
df.drop(columns=['db_name', 'table_name'], inplace=True)
# df['table'] = df['table'].map(self.table)
df.to_csv(os.path.join(output_path, "dataset_placement.csv"), index=False)
# header: abFP,y
df = pd.read_csv(os.path.join(input_path, "query_placement.csv"))
df['abFP'] = df['abFP'].map(self.abFP)
df.to_csv(os.path.join(output_path, "query_placement.csv"), index=False)
if __name__ == '__main__':
a = Anonymization('../anonymize')
a.anonymize_table("../", 'report-table-size-20241021.csv')
a.log_map()
start_date = datetime.date(year=2024, month=10, day=22)
for i in range(7):
day = start_date + timedelta(days=i * 7)
end_date = day + timedelta(days=6)
a.anonymize_workload("../newTraces/", f'report-abFP-volume-table-{day.strftime("%Y%m%d")}-{end_date.strftime("%Y%m%d")}-Presto.csv')
a.anonymize_workload("../newTraces/", f'report-abFP-volume-table-{day.strftime("%Y%m%d")}-{end_date.strftime("%Y%m%d")}-Spark.csv')
a.log_map()
day_ranges = pd.date_range(start='2024-10-22', end='2025-01-27').strftime('%Y%m%d').tolist()
for day in day_ranges:
a.anonymize_job_Presto("../jobTraces/", f'{day}-Presto.csv')
a.anonymize_job_Spark("../jobTraces/", f'{day}-Spark.csv')
a.log_map()
""" Not needed for external usage since the inputs should be anonymized already """
# base_dir_path = "../sample_1.000_rep0.002"
# a.anonymize_placements(base_dir_path, "test_run_c70_bw0.02_local30")
# start_date = datetime.date(year=2024, month=10, day=29)
# for i in range(6):
# a.anonymize_placements(base_dir_path, f"test_run_c70_bw0.02_local30_{start_date.strftime('%m%d')}")
# start_date += timedelta(days=7)