in placement.py [0:0]
def ingress_egress(self, debug=False):
results = []
for i in range(len(self.placement_list) - 1):
print(self.placement_list[i], "->", self.placement_list[i + 1])
# header: table, z, w, size
path_bf = os.path.join(self.dir_name, self.placement_list[i], 'dataset_placement.csv')
# extract the number after "local" pattern from the path: xxx_local{int}xxxx
pattern = r"local(\d+)"
match = re.search(pattern, self.placement_list[i])
if match:
before = int(match.group(1)) / 100
else:
raise ValueError("Pattern not found")
match = re.search(pattern, self.placement_list[i + 1])
if match:
after = int(match.group(1)) / 100
else:
raise ValueError("Pattern not found")
print("before", before, "vs after", after)
path_af = os.path.join(self.dir_name, self.placement_list[i + 1], 'dataset_placement.csv')
# print(path_bf, path_af)
df_bf = pd.read_csv(path_bf)
print("# of rows in bf before merge:", len(df_bf))
df_bf, _ = merge_similar_rows(df_bf)
print("# of rows in bf after merge:", len(df_bf))
df_af = pd.read_csv(path_af)
print("# of rows in af before merge:", len(df_af))
df_af = df_af[df_af['size'] > 0] # Ensure no zero sizes before merging
# df_af, _ = merge_similar_rows(df_af)
print("# of rows in af after merge:", len(df_af))
# Sort data frames by the 'table' column to ensure matching rows can be compared
df_bf = df_bf.sort_values('table').reset_index(drop=True)
df_af = df_af.sort_values('table').reset_index(drop=True)
if not df_bf['table'].equals(df_af['table']):
new_tables = df_af[~df_af['table'].isin(df_bf['table'])]
group_pattern = r"(.+)\.group"
table_pattern = r"(.+)\.(.+)"
for index, row in df_bf.iterrows():
match = re.match(group_pattern, row['table'])
if match:
db_name = match.group(1)
af_row = df_af[df_af['table'] == row['table']]
if not af_row.empty:
df_bf.at[index, 'size'] = af_row['size'].values[0]
new_entries = []
for index, row in new_tables.iterrows():
match = re.match(table_pattern, row['table'])
if match:
db_name = match.group(1)
group_row = df_bf[df_bf['table'] == f"{db_name}.group"]
if not group_row.empty:
row['z'] = group_row['z'].values[0]
row['w'] = group_row['w'].values[0]
new_entries.append(row)
else:
new_entries.append(row)
else:
new_entries.append(row)
new_entries_df = pd.DataFrame(new_entries)
df_bf = pd.concat([df_bf, new_entries_df], ignore_index=True)
df_bf = df_bf.sort_values('table').reset_index(drop=True)
# debug
if not df_bf['table'].equals(df_af['table']):
print("[Debug] Tables in bf but not in af:")
print(df_bf[~df_bf['table'].isin(df_af['table'])])
df_bf.drop(df_bf[~df_bf['table'].isin(df_af['table'])].index, inplace=True)
df_bf = df_bf.sort_values('table').reset_index(drop=True)
assert df_bf['table'].equals(df_af['table']), f"Tables do not match {len(df_bf)} {len(df_af)}"
# Calculate ingress and egress
ingress = (df_bf['z'] == 0) & (df_bf['w'] == 1) & (df_af['w'] == 0)
egress = (df_bf['z'] == 1) & (df_bf['w'] == 0) & (df_af['z'] == 0)
# if debug:
# # Print the tables that are being ingressed and egressed
# print("Ingressed tables:")
# print(df_bf[ingress])
# print("Egressed tables:")
# print(df_bf[egress])
# Compute weighted ingress and egress
ingress_count = (ingress * df_bf['size']).sum()
egress_count = (egress * df_bf['size']).sum()
# Save the results for this pair
results.append({
'from': self.placement_list[i],
'to': self.placement_list[i + 1],
'ingress': human_readable_size(ingress_count * 1024 ** 3),
'egress': human_readable_size(egress_count * 1024 ** 3)
})
print(f"From {self.placement_list[i]} to {self.placement_list[i + 1]}:"
f" ingress: {human_readable_size(ingress_count * 1024 ** 3)},"
f" egress: {human_readable_size(egress_count * 1024 ** 3)},"
f" remove {human_readable_size((self.total_storage_gb * 1024 ** 3 * (before - after)))} from onprem")
print("Results:", results)
return results