in optimizer.py [0:0]
def prepare_replication(self):
def _compute_edges():
edge_per_table = defaultdict(int)
for _, row in self.df.iterrows():
idx = f"{row['db_name']}.{row['table_name']}"
edge_per_table[idx] += 1
return edge_per_table
def _compute_access_size():
access_per_table = defaultdict(int)
for _, row in self.df.iterrows():
idx = f"{row['db_name']}.{row['table_name']}"
access_per_table[idx] += row['inputDataSize'] + row['outputDataSize']
return access_per_table
def _load_table_size():
size_per_table = {}
for _, row in self.df_table_size.iterrows():
idx = f"{row['hive_database_name']}.{row['hive_table_name']}"
size_per_table[idx] = row['dir_size']
return size_per_table
assert self.df is not None
print("Replication strategy:", self.rep_strategy)
print("No matter read or write or both, we count as 1 edge")
# print("Default: rank tables by # edges normalized table size (JAD)")
if self.rep_strategy == "job_access_density":
edges = _compute_edges()
size_per_table = _load_table_size()
metric_per_table = {k: edges[k] / size_per_table[k]
if k in size_per_table and size_per_table[k] > 0 else 0 for k in edges}
elif self.rep_strategy == "read_traffic_volume":
metric_per_table = _compute_access_size()
elif self.rep_strategy == "inverse_dataset_size":
edges = _compute_edges() # used only for the keys
size_per_table = _load_table_size()
metric_per_table = {k: 1/size_per_table[k] if k in size_per_table and size_per_table[k] > 0 else 0 for k in edges}
#metric_per_table = {k: 1 / v if v > 0 else 0 for k, v in size_per_table.items()}
elif self.rep_strategy == "job_access_frequency":
metric_per_table = _compute_edges()
elif self.rep_strategy == "read_traffic_density":
access_size = _compute_access_size()
size_per_table = _load_table_size()
metric_per_table = {k: access_size[k] / size_per_table[k]
if k in size_per_table and size_per_table[k] > 0 else 0 for k in access_size}
else:
raise ValueError(f"Unknown replication strategy: {self.rep_strategy}")
sorted_table = {k: v for k, v in sorted(metric_per_table.items(), key=lambda item: -item[1])}
unique_db_tables = {idx: rank for rank, idx in enumerate(sorted_table)}
return unique_db_tables