placement.py (129 lines of code) (raw):
import os
import re
import pandas as pd
from utility import human_readable_size
def merge_similar_rows(df):
df = df.copy()
initial_length = len(df)
df['db_name'] = df['table'].apply(lambda x: x.split('.')[0])
df['table_name'] = df['table'].apply(lambda x: x.split('.')[1])
removed_rows = [] # for debug
for db_name in df['db_name'].unique():
group_row = df[df['table'] == f'{db_name}.group']
if group_row.empty or len(group_row) > 1:
assert len(group_row) <= 1, "Multiple conflicting groups found"
# print(f"[Merge Rows] {db_name}.group not found")
continue # Skip if no group or multiple conflicting groups
group_z, group_w = group_row['z'].values[0], group_row['w'].values[0]
if group_z == 0 and group_w == 0:
print(f"[Merge Rows] {db_name}.group is replicated (z=w=0), skipped")
continue
similar_rows = df[(df['db_name'] == db_name) &
(df['z'] == group_z) & (df['w'] == group_w) &
(df['table'] != f'{db_name}.group')]
if not similar_rows.empty:
# Store removed rows before dropping
removed_rows.append(similar_rows.copy())
# update group size to add up
group_size = similar_rows['size'].sum()
df.loc[group_row.index, 'size'] += group_size
df = df.drop(similar_rows.index)
print(f"Merging rows reduces rows from {initial_length} to {len(df)}.")
second_length = len(df)
removed_rows.append(df[df['size'] == 0].copy())
# Drop rows where size is 0
df = df[df['size'] > 0]
print(f"Removing rows with size 0 reduces rows from {second_length} to {len(df)}.")
return df, pd.concat(removed_rows)
class Placement:
def __init__(self, dir_name, placement_list, total_storage):
self.dir_name = dir_name
self.placement_list = placement_list
self.total_storage_gb = total_storage
def ingress_egress(self, debug=False):
results = []
for i in range(len(self.placement_list) - 1):
print(self.placement_list[i], "->", self.placement_list[i + 1])
# header: table, z, w, size
path_bf = os.path.join(self.dir_name, self.placement_list[i], 'dataset_placement.csv')
# extract the number after "local" pattern from the path: xxx_local{int}xxxx
pattern = r"local(\d+)"
match = re.search(pattern, self.placement_list[i])
if match:
before = int(match.group(1)) / 100
else:
raise ValueError("Pattern not found")
match = re.search(pattern, self.placement_list[i + 1])
if match:
after = int(match.group(1)) / 100
else:
raise ValueError("Pattern not found")
print("before", before, "vs after", after)
path_af = os.path.join(self.dir_name, self.placement_list[i + 1], 'dataset_placement.csv')
# print(path_bf, path_af)
df_bf = pd.read_csv(path_bf)
print("# of rows in bf before merge:", len(df_bf))
df_bf, _ = merge_similar_rows(df_bf)
print("# of rows in bf after merge:", len(df_bf))
df_af = pd.read_csv(path_af)
print("# of rows in af before merge:", len(df_af))
df_af = df_af[df_af['size'] > 0] # Ensure no zero sizes before merging
# df_af, _ = merge_similar_rows(df_af)
print("# of rows in af after merge:", len(df_af))
# Sort data frames by the 'table' column to ensure matching rows can be compared
df_bf = df_bf.sort_values('table').reset_index(drop=True)
df_af = df_af.sort_values('table').reset_index(drop=True)
if not df_bf['table'].equals(df_af['table']):
new_tables = df_af[~df_af['table'].isin(df_bf['table'])]
group_pattern = r"(.+)\.group"
table_pattern = r"(.+)\.(.+)"
for index, row in df_bf.iterrows():
match = re.match(group_pattern, row['table'])
if match:
db_name = match.group(1)
af_row = df_af[df_af['table'] == row['table']]
if not af_row.empty:
df_bf.at[index, 'size'] = af_row['size'].values[0]
new_entries = []
for index, row in new_tables.iterrows():
match = re.match(table_pattern, row['table'])
if match:
db_name = match.group(1)
group_row = df_bf[df_bf['table'] == f"{db_name}.group"]
if not group_row.empty:
row['z'] = group_row['z'].values[0]
row['w'] = group_row['w'].values[0]
new_entries.append(row)
else:
new_entries.append(row)
else:
new_entries.append(row)
new_entries_df = pd.DataFrame(new_entries)
df_bf = pd.concat([df_bf, new_entries_df], ignore_index=True)
df_bf = df_bf.sort_values('table').reset_index(drop=True)
# debug
if not df_bf['table'].equals(df_af['table']):
print("[Debug] Tables in bf but not in af:")
print(df_bf[~df_bf['table'].isin(df_af['table'])])
df_bf.drop(df_bf[~df_bf['table'].isin(df_af['table'])].index, inplace=True)
df_bf = df_bf.sort_values('table').reset_index(drop=True)
assert df_bf['table'].equals(df_af['table']), f"Tables do not match {len(df_bf)} {len(df_af)}"
# Calculate ingress and egress
ingress = (df_bf['z'] == 0) & (df_bf['w'] == 1) & (df_af['w'] == 0)
egress = (df_bf['z'] == 1) & (df_bf['w'] == 0) & (df_af['z'] == 0)
# if debug:
# # Print the tables that are being ingressed and egressed
# print("Ingressed tables:")
# print(df_bf[ingress])
# print("Egressed tables:")
# print(df_bf[egress])
# Compute weighted ingress and egress
ingress_count = (ingress * df_bf['size']).sum()
egress_count = (egress * df_bf['size']).sum()
# Save the results for this pair
results.append({
'from': self.placement_list[i],
'to': self.placement_list[i + 1],
'ingress': human_readable_size(ingress_count * 1024 ** 3),
'egress': human_readable_size(egress_count * 1024 ** 3)
})
print(f"From {self.placement_list[i]} to {self.placement_list[i + 1]}:"
f" ingress: {human_readable_size(ingress_count * 1024 ** 3)},"
f" egress: {human_readable_size(egress_count * 1024 ** 3)},"
f" remove {human_readable_size((self.total_storage_gb * 1024 ** 3 * (before - after)))} from onprem")
print("Results:", results)
return results
p = Placement('long_term',
['test_run_c10_bw0.02_local90',
'test_run_c20_bw0.02_local80',
'test_run_c30_bw0.02_local70',
'test_run_c40_bw0.02_local60',
'test_run_c50_bw0.02_local50',
'test_run_c60_bw0.02_local40',
'test_run_c70_bw0.02_local30',
'test_run_c80_bw0.02_local20',
'test_run_c90_bw0.02_local10',
],
total_storage=299.12 * 1024 ** 2)
p.ingress_egress()