optimizer.py (767 lines of code) (raw):
import pandas as pd
import numpy as np
from gurobipy import Model, GRB, LinExpr
import os
import time
from utility import human_readable_size, to_seconds
import sys
from Yugong.Ownership import Ownership
from collections import defaultdict
def print_time(start_time, end_time, msg, file=sys.stdout):
print(msg, f"{end_time - start_time:.2f}", "seconds", file=file, flush=True)
def merge_similar_rows(df, yugong: bool):
df = df.copy()
initial_length = len(df)
df['db_name'] = df['table'].apply(lambda x: x.split('.')[0])
df['table_name'] = df['table'].apply(lambda x: x.split('.')[1])
removed_rows = [] # for debug
if not yugong:
for db_name in df['db_name'].unique():
group_row = df[df['table'] == f'{db_name}.group']
if group_row.empty or len(group_row) > 1:
assert len(group_row) <= 1, "Multiple conflicting groups found"
print(f"[Merge Rows] {db_name}.group not found")
continue # Skip if no group or multiple conflicting groups
group_z, group_w = group_row['z'].values[0], group_row['w'].values[0]
if group_z == 0 and group_w == 0:
print(f"[Merge Rows] {db_name}.group is replicated (z=w=0), skipped")
continue
similar_rows = df[(df['db_name'] == db_name) &
(df['z'] == group_z) & (df['w'] == group_w) &
(df['table'] != f'{db_name}.group')]
if not similar_rows.empty:
# Store removed rows before dropping
removed_rows.append(similar_rows.copy())
# update group size to add up
group_size = similar_rows['size'].sum()
df.loc[group_row.index, 'size'] += group_size
df = df.drop(similar_rows.index)
print(f"Merging rows reduces rows from {initial_length} to {len(df)}.")
second_length = len(df)
removed_rows.append(df[df['size'] == 0].copy())
# Drop rows where size is 0
df = df[df['size'] > 0]
print(f"Removing rows with size 0 reduces rows from {second_length} to {len(df)}.")
return df, pd.concat(removed_rows)
class Query_on_DB_Table:
def __init__(self, job_data_access_df,
workload_print_info,
db_table_size_file_name,
rep_threshold=None,
rep_strategy="",
yugong=False,
ownership=None,
rep_list=None,
k=1.0,
log_dir='.',
):
self.df = None
self.previous_placement_path = None
self.prev_z = None
self.prev_w = None
self.db_table_num = None
self.abFP_num = None
self.total_storage_gb = None
self.dataset_num = None
self.job_data_access_df = job_data_access_df
self.workload_print_info = workload_print_info
print("workload info", workload_print_info)
self.db_table_size_path = db_table_size_file_name
print("db_table_size path", db_table_size_file_name)
self.rep_threshold = rep_threshold
self.rep_strategy = rep_strategy
self.rep_constr = []
self.rep_list = []
self.yugong = yugong
self.ownership = None
if yugong:
print("** Yugong mode **")
assert ownership is not None, "Ownership must be provided in Yugong mode"
assert rep_list is not None, "Replication list must be provided in Yugong mode (to align with Moirai)"
self.ownership = ownership
self.rep_list = rep_list
self.s = None
self.adj_list_input = defaultdict(dict) # Maps table id -> {job id: input_size}
self.adj_list_output = defaultdict(dict) # Maps table id -> {job id: output_size}
self.c = None
self.k = k
self.X_scale = 0
self.load_workload()
assert self.X_scale > 0
self.df_table_size = None
self.prepare_db_table_size(db_table_size_file_name)
assert self.df_table_size is not None
self.unique_abFP = {}
self.unique_db_tables = self.prepare_replication()
self.prepare_workload()
self.model = None # gurobi model
self.prepare_basic_model()
self.y, self.z, self.w, self.u, self.v = (None, None, None, None, None)
self.log_dir = log_dir
self.add_y_z_w_u_v(self.abFP_num, self.dataset_num, True, log_dir=log_dir)
self.workload_constrs = self.add_workload_constr()
def __del__(self):
if hasattr(self, 'model') and self.model is not None:
self.model.dispose()
def restore_unique_db_tables(self, file_path, log_dir=None):
prev_placement_df = pd.read_csv(file_path)
prev_placement_df['table'] = prev_placement_df['table'].astype(str)
prev_placement_df, removed_df = merge_similar_rows(prev_placement_df, self.yugong)
if log_dir is not None:
removed_df.to_csv(os.path.join(log_dir, 'removed_rows.csv'), index=False)
print("clean up unique_db_tables", len(self.unique_db_tables), "to", len(prev_placement_df))
self.unique_db_tables = {}
count = 0
for _, row in prev_placement_df.iterrows():
db_table = row['table']
self.unique_db_tables[db_table] = count
count += 1
def update_workload(self, job_data_access_df, workload_print_info,log_dir=None):
# TODO: db_table_size_file_name is not updated
print("workload path from", self.workload_print_info, "to", workload_print_info)
self.job_data_access_df = job_data_access_df
self.workload_print_info = workload_print_info
# TODO: no decaying, just completely replace
# Updated: self.df, self.abFP_num (wrong), self.db_table_num (wrong), self.c
self.load_workload()
self.unique_abFP = {} # reset abFP
self.prepare_workload()
# self.visualize_a() # debug
# TODO: omit update replication for now (old constraints still there)
# update gurobi variables
self.model.remove(self.workload_constrs)
self.update_y_z_w_u_v(self.abFP_num, self.dataset_num, binary=True, log_dir=log_dir)
self.workload_constrs = self.add_workload_constr()
self.model.update()
def load_workload(self):
# format: abstractFingerPrint,db_name,table_name,
# inputDataSize, outputDataSize (in bytes),
# cputime (in seconds)
self.df = self.job_data_access_df
self.df['totalDataSize'] = self.df['inputDataSize'] + self.df['outputDataSize']
k = self.k
assert 0 < k <= 1, f"Top {k} jobs do not satisfy 0 < k <= 1"
if k < 1:
# Calculate total inputDataSize for each abstractFingerPrint
abFP_sizes = self.df.groupby('abstractFingerPrint')['totalDataSize'].sum()
# Sort abFPs by inputDataSize in descending order
abFP_sizes = abFP_sizes.sort_values(ascending=False)
# Determine the top k% of abFPs to keep
top_k_count = int(len(abFP_sizes) * k)
top_k_abFPs = abFP_sizes.head(top_k_count).index
# Filter the dataframe to include only the selected top k abFPs
self.df = self.df[self.df['abstractFingerPrint'].isin(top_k_abFPs)]
# Calculate the percentage of accesses retained
total_access_size = abFP_sizes.sum()
top_k_access_size = abFP_sizes.loc[top_k_abFPs].sum()
percent_access_size = (top_k_access_size / total_access_size) * 100
print(f"Top {k * 100:.2f}% of abFPs (# {top_k_count}) "
f"contribute {percent_access_size:.2f}% of total read/write accesses")
self.X_scale = percent_access_size / 100
else:
self.X_scale = 1
assert self.df is not None
row_num = len(self.df)
self.abFP_num = self.df['abstractFingerPrint'].nunique()
self.db_table_num = self.df.groupby(['db_name', 'table_name']).ngroups
print("# row", row_num)
print("# db_table in workload", self.db_table_num)
print("# abFP", self.abFP_num)
self.c = self.df.groupby('abstractFingerPrint')['cputime'].sum()
print("c created")
def prepare_db_table_size(self, file_name):
# we care hive_database_name,hive_table_name,dir_size
self.df_table_size = pd.read_csv(file_name)
self.df_table_size['hive_database_name'] = self.df_table_size['hive_database_name'].astype(str)
self.df_table_size['hive_table_name'] = self.df_table_size['hive_table_name'].astype(str)
# how many lines with dir_size != 0
print("# of lines with dir_size != 0", self.df_table_size[self.df_table_size['dir_size'] != 0].shape[0])
# filtered out =0
self.df_table_size = self.df_table_size[self.df_table_size['dir_size'] > 0]
# how many lines with input_data_size == 0
print("# of lines with input_data_size == 0", self.df[self.df['inputDataSize'] == 0].shape[0])
# dir_size into GB
self.df_table_size['dir_size'] = self.df_table_size['dir_size'] / 1024 ** 3
# the total size of the tables
self.total_storage_gb = self.df_table_size['dir_size'].sum()
print("total size of the tables", human_readable_size(self.total_storage_gb * 1024 ** 3))
def prepare_replication(self):
def _compute_edges():
edge_per_table = defaultdict(int)
for _, row in self.df.iterrows():
idx = f"{row['db_name']}.{row['table_name']}"
edge_per_table[idx] += 1
return edge_per_table
def _compute_access_size():
access_per_table = defaultdict(int)
for _, row in self.df.iterrows():
idx = f"{row['db_name']}.{row['table_name']}"
access_per_table[idx] += row['inputDataSize'] + row['outputDataSize']
return access_per_table
def _load_table_size():
size_per_table = {}
for _, row in self.df_table_size.iterrows():
idx = f"{row['hive_database_name']}.{row['hive_table_name']}"
size_per_table[idx] = row['dir_size']
return size_per_table
assert self.df is not None
print("Replication strategy:", self.rep_strategy)
print("No matter read or write or both, we count as 1 edge")
# print("Default: rank tables by # edges normalized table size (JAD)")
if self.rep_strategy == "job_access_density":
edges = _compute_edges()
size_per_table = _load_table_size()
metric_per_table = {k: edges[k] / size_per_table[k]
if k in size_per_table and size_per_table[k] > 0 else 0 for k in edges}
elif self.rep_strategy == "read_traffic_volume":
metric_per_table = _compute_access_size()
elif self.rep_strategy == "inverse_dataset_size":
edges = _compute_edges() # used only for the keys
size_per_table = _load_table_size()
metric_per_table = {k: 1/size_per_table[k] if k in size_per_table and size_per_table[k] > 0 else 0 for k in edges}
#metric_per_table = {k: 1 / v if v > 0 else 0 for k, v in size_per_table.items()}
elif self.rep_strategy == "job_access_frequency":
metric_per_table = _compute_edges()
elif self.rep_strategy == "read_traffic_density":
access_size = _compute_access_size()
size_per_table = _load_table_size()
metric_per_table = {k: access_size[k] / size_per_table[k]
if k in size_per_table and size_per_table[k] > 0 else 0 for k in access_size}
else:
raise ValueError(f"Unknown replication strategy: {self.rep_strategy}")
sorted_table = {k: v for k, v in sorted(metric_per_table.items(), key=lambda item: -item[1])}
unique_db_tables = {idx: rank for rank, idx in enumerate(sorted_table)}
return unique_db_tables
def prepare_workload(self):
""" clean_df() deprecated
def clean_df():
# remove lines in self.df where db.table has no size
print("\ncleaning df start...")
# Create a set of valid db.table combinations from df_table_size
valid_db_tables = set(
self.df_table_size.apply(lambda row: f"{row['hive_database_name']}.{row['hive_table_name']}", axis=1))
initial_num_rows = len(self.df)
initial_total_input_size = self.df['inputDataSize'].sum()
initial_total_cputime = self.df['cputime'].sum()
removed_rows = []
for index, row in self.df.iterrows():
db_table = f"{row['db_name']}.{row['table_name']}"
if db_table not in valid_db_tables:
removed_rows.append(row)
# Remove the rows from the dataframe
self.df = self.df.drop([row.name for row in removed_rows])
# Calculate remaining
remaining_total_input_size = self.df['inputDataSize'].sum()
remaining_total_cputime = self.df['cputime'].sum()
print(f"# of lines removed: {len(removed_rows)} vs {initial_num_rows}")
# print(
# f"# of unique tables involved in removal: {len(set(f'{row['db_name']}.{row['table_name']}' for row in removed_rows))}")
print(f"Total CPU time influenced: {initial_total_cputime - remaining_total_cputime:.0f}"
f" ({(initial_total_cputime - remaining_total_cputime)/initial_total_cputime*100:.1f}%)")
print(f"Total inputDataSize influenced: {human_readable_size(initial_total_input_size-remaining_total_input_size)}"
f" vs {human_readable_size(initial_total_input_size)}")
self.abFP_num = self.df['abstractFingerPrint'].nunique()
self.db_table_num = self.df.groupby(['db_name', 'table_name']).ngroups
print("[After clean] # abFP", self.abFP_num)
print("[After clean] # db_table in workload", self.db_table_num)
self.c = self.df.groupby('abstractFingerPrint')['cputime'].sum()
print("[After clean] compute c", self.c)
print("cleaning df end...\n", flush=True) """
def prepare_unique_abFP():
counter_a = len(self.unique_abFP) # old
counter_t = len(self.unique_db_tables) # old
print(f"from {counter_a} x {counter_t}", end="")
for index, row in self.df.iterrows():
i_string = row['abstractFingerPrint']
j_string = f"{row['db_name']}.{row['table_name']}"
if i_string not in self.unique_abFP:
self.unique_abFP[i_string] = counter_a
counter_a += 1
if j_string not in self.unique_db_tables:
self.unique_db_tables[j_string] = counter_t
counter_t += 1
assert counter_a == len(self.unique_abFP) and counter_t == len(self.unique_db_tables)
print(f" to {counter_a} x {counter_t}")
self.abFP_num = len(self.unique_abFP)
self.db_table_num = len(self.unique_db_tables)
self.adj_list_input = defaultdict(dict)
self.adj_list_output = defaultdict(dict)
for index, row in self.df.iterrows():
i_string = row['abstractFingerPrint']
j_string = f"{row['db_name']}.{row['table_name']}" # This needs proper mapping
i = self.unique_abFP[i_string]
j = self.unique_db_tables[j_string]
self.adj_list_input[j][i] = row['inputDataSize'] / 1024 ** 3 # convert to GB
self.adj_list_output[j][i] = row['outputDataSize'] / 1024 ** 3
# if clean:
# clean_df()
prepare_unique_abFP()
counter_t = len(self.unique_db_tables)
pair_num = len(self.df)
print("[sanity check] # of non-zero edges [i,j]", pair_num, flush=True)
# print("should no more than # of rows in self.df", len(self.df), flush=True)
# expand s variable to all db_table
assert self.df_table_size is not None
# Identify the db_tables present in self.df_table_size
db_tables_in_df = set(
self.df_table_size.apply(lambda row: f"{row['hive_database_name']}.{row['hive_table_name']}", axis=1))
# Find the db_tables that are in self.df_table_size but not in self.unique_db_tables
extra_db_tables = db_tables_in_df - set(self.unique_db_tables.keys())
if self.yugong:
self.df_table_size['project'] = self.df_table_size.apply(
lambda row: self.ownership.get_table_ownership(f"{row['hive_database_name']}.{row['hive_table_name']}"), axis=1)
missing_sizes = self.df_table_size[self.df_table_size.apply(
lambda row: f"{row['hive_database_name']}.{row['hive_table_name']}" in extra_db_tables, axis=1)]
grouped_sizes = missing_sizes.groupby('project')['dir_size'].sum()
group_num = len(grouped_sizes)
print("# of grouped untouched projects this time period", group_num, flush=True, end=' ')
for project in grouped_sizes.index:
j_string = f"{project}.group"
self.ownership.add_table_ownership(j_string, project)
assert self.ownership.get_table_ownership(j_string) == project, f"Ownership not set for {j_string}"
else:
# Filter self.df_table_size to keep only the extra db_tables, then group by hive_database_name and sum the sizes
missing_sizes = self.df_table_size[self.df_table_size.apply(
lambda row: f"{row['hive_database_name']}.{row['hive_table_name']}" in extra_db_tables, axis=1)]
grouped_sizes = missing_sizes.groupby('hive_database_name')['dir_size'].sum()
group_num = len(grouped_sizes)
print("# of grouped untouched dbs this time period", group_num, flush=True, end=' ')
count_bf = len(self.unique_db_tables)
# allocate id for untouched dbs
for db_name in grouped_sizes.index:
j_string = f"{db_name}.group"
if j_string not in self.unique_db_tables:
self.unique_db_tables[j_string] = counter_t
counter_t += 1
print("(only", counter_t - count_bf, "newly appeared dbs)", flush=True)
# self.dataset_num = self.db_table_num + group_num
assert counter_t == len(self.unique_db_tables), f"{counter_t} != {len(self.unique_db_tables)}"
self.dataset_num = counter_t
for table_id in range(self.db_table_num, self.dataset_num):
self.adj_list_input[table_id] = {}
self.adj_list_output[table_id] = {}
print(f"adjacency lists have tables {len(self.adj_list_input)}", flush=True)
self.s = np.zeros(self.dataset_num)
start = time.time()
size_lookup = {
(row.hive_database_name, row.hive_table_name): row.dir_size
for row in self.df_table_size.itertuples(index=False)
}
sum_hot_gb = 0
# set_size = self.df_table_size['dir_size'].min()
set_size = 0
for db_table in self.unique_db_tables:
parts = db_table.split('.')
if len(parts) != 2:
raise ValueError(f"Invalid db_table format: {db_table} into {parts}")
db_name, table_name = parts
size = size_lookup.get((db_name, table_name), None)
if size is None or size == 0:
if db_name in grouped_sizes.index:
continue # delayed to next code block
# print("Warning: db_table not found in table size file", db_table, "set to", set_size)
self.s[self.unique_db_tables[db_table]] = set_size
sum_hot_gb += set_size
else:
self.s[self.unique_db_tables[db_table]] = size
sum_hot_gb += size
print(f"Touched data size: {human_readable_size(sum_hot_gb * 1024 ** 3)}")
# cold dataset that is not in the workload
sum_gb = 0
for db_name in map(str, grouped_sizes.index):
j_string = f"{db_name}.group"
self.s[self.unique_db_tables[j_string]] = grouped_sizes[db_name]
sum_gb += grouped_sizes[db_name]
print(f"Non-touched data size: {human_readable_size(sum_gb * 1024 ** 3)}")
print_time(start, time.time(), "s created")
def prepare_basic_model(self):
if self.model is None:
self.model = Model("query_on_db_table")
if self.model is None:
print('Could not create model')
return None
# Set Threads to the maximum number of processors
self.model.setParam('Threads', 12)
self.model.setParam("NodefileStart", 0.1) # Start writing to disk at 10% of RAM usage
self.model.setParam("NodefileDir", ".")
self.model.setParam("MemLimit", 64 * 1024) # 64 GB
def add_y_z_w_u_v(self, N, M, binary=True, log_dir='.', file=sys.stdout):
u = {}
v = {}
pair_num = len(self.df)
temp_counter = 0
step = pair_num // 10
start = time.time()
print("init y,z,w", file=file, flush=True)
assert binary is True
# Decision variables
# y[i]: If workload i is executed in D1, y[i] = 0; if in D2, y[i] = 1.
y = self.model.addVars(N, vtype=GRB.BINARY, name='y')
# z[i]: If dataset j exists in D1, z[j] = 0; else, z[j] = 1.
z = self.model.addVars(M, vtype=GRB.BINARY, name='z')
# w[i]: If dataset j exists in D2, w[j] = 0; else, w[j] = 1.
w = self.model.addVars(M, vtype=GRB.BINARY, name='w')
print_time(start, time.time(), "y,z,w created", file=file)
if self.yugong:
print("Enforce replication on the same set of tables with Moirai")
print("rep_threshold", self.rep_threshold)
# print(f"rep_threshold={self.rep_threshold} < 0, z+w=1, no replication")
replicated_indices = set()
for table in self.rep_list:
j = self.unique_db_tables.get(table, None)
if j is None:
print(f"[Warning] Table {table} not found in unique_db_tables, skip for replication")
continue
self.rep_constr.append(self.model.addConstr(z[j] == 0, name=f'z_{j}_0'))
self.rep_constr.append(self.model.addConstr(w[j] == 0, name=f'w_{j}_0'))
replicated_indices.add(j)
# # First, replicate 0.004 of data
# threshold_gb = self.total_storage_gb * 0.004
# print(f"Replicate 0.4% data = {human_readable_size(threshold_gb * 1024 ** 3)}")
# total_size_gb, rep_count = 0, 0
# replicated_indices = set()
# for j in range(M):
# if self.s[j] > 0:
# if total_size_gb + self.s[j] >= threshold_gb + 1024: # 1TB buffer
# continue
# total_size_gb += self.s[j]
# self.rep_constr.append(self.model.addConstr(z[j] == 0, name=f'z_{j}_0'))
# self.rep_constr.append(self.model.addConstr(w[j] == 0, name=f'w_{j}_0'))
# replicated_indices.add(j)
# rep_count += 1
# for key in self.unique_db_tables:
# if j == self.unique_db_tables[key]:
# self.rep_list.append(key)
# if total_size_gb >= threshold_gb:
# print(
# f"Replicated Total {rep_count} Tables of {human_readable_size(total_size_gb * 1024 ** 3)} till pos {j}")
# print(f"# of replicated tables logged down:", len(self.rep_list))
# break
#Then enforce existance of each tables
for j in range(M):
if j not in replicated_indices:
self.model.addConstr(z[j] + w[j] <= 1, name=f'zw_{j}')
else:
if self.rep_threshold is not None and self.rep_threshold < 0:
print(f"rep_threshold={self.rep_threshold} < 0, z+w=1, no replication")
for j in range(M):
self.model.addConstr(z[j] + w[j] == 1, name=f'zw_{j}')
else:
print(f"rep_threshold={self.rep_threshold}, z+w<=1, replication allowed")
for j in range(M):
self.model.addConstr(z[j] + w[j] <= 1, name=f'zw_{j}')
if self.rep_threshold is not None and self.rep_threshold > 0:
assert len(self.rep_list) == 0
threshold_gb = self.total_storage_gb * self.rep_threshold
print(
f"replicate top {self.rep_threshold * 100:.2f}% data = {human_readable_size(threshold_gb * 1024 ** 3)}")
counter, total_size_gb, rep_count = 0, 0, 0
for j in range(M):
if self.s[j] > 0:
if total_size_gb + self.s[j] >= threshold_gb + 1024: # 1TB
continue
total_size_gb += self.s[j]
self.rep_constr.append(self.model.addConstr(z[j] == 0, name=f'z_{j}_0'))
self.rep_constr.append(self.model.addConstr(w[j] == 0, name=f'w_{j}_0'))
rep_count += 1
for key in self.unique_db_tables:
if j == self.unique_db_tables[key]:
self.rep_list.append(key)
break
if total_size_gb >= threshold_gb:
print(f"Replicate Total {rep_count} Tables of {human_readable_size(total_size_gb * 1024 ** 3)} till pos {j}")
print(f"# of replicated tables logged down:", len(self.rep_list))
with open(os.path.join(log_dir, f"replicated_tables_{self.rep_threshold}_{self.rep_strategy}.csv"), 'w') as f:
f.write("replicated_tables\n")
for key in self.rep_list:
f.write(f"{key}\n")
break
start = time.time()
print("If sample rate < 1, progress is usually under-estimated, nothing wrong happened")
print("init u, v", file=file, flush=True)
replicated_indices = [self.unique_db_tables[key] for key in self.rep_list]
for j in range(M):
if j in replicated_indices:
continue
job_ids = self.adj_list_input[j].keys()
for i in job_ids:
u[(i, j)] = self.model.addVar(vtype=GRB.BINARY, name=f'u_{i}_{j}')
v[(i, j)] = self.model.addVar(vtype=GRB.BINARY, name=f'v_{i}_{j}')
temp_counter += 1
if step != 0 and temp_counter % step == 0:
print(f"== progress:{temp_counter / pair_num * 100:.0f}%", file=file, flush=True)
print_time(start, time.time(), "u, v created", file=file)
if self.yugong:
project_list = []
# print("unique_db_tables", self.unique_db_tables, file=file, flush=True) # debug
for key in self.unique_db_tables:
table_ownership = self.ownership.get_table_ownership(key)
# assert table_ownership is not None, f"Table ownership not found for {key}"
if table_ownership not in project_list:
project_list.append(table_ownership)
for key in self.unique_abFP:
query_ownership = key
if query_ownership not in project_list:
project_list.append(query_ownership)
print("# of projects", len(project_list), file=file, flush=True)
print("project_list", project_list, file=file, flush=True) # debug
# init a dict for each project to store the jobs in the project
project_jobs = {project: [] for project in project_list}
for key in self.unique_abFP:
i = self.unique_abFP[key]
project = key
project_jobs[project].append(i)
# Add constraints for each project to ensure all jobs are either on-premises or in the cloud
project_vars = {project: self.model.addVar(vtype=GRB.BINARY, name=f'y_project_{project}') for project in project_list}
for project in project_jobs:
project_var = project_vars[project]
for i in project_jobs[project]:
self.model.addConstr(y[i] == project_var, name=f'y_project_{project}_{i}')
project_tables = {project: [] for project in project_list}
for key in self.unique_db_tables:
j = self.unique_db_tables[key]
project = self.ownership.get_table_ownership(key)
project_tables[project].append(j)
# Add constraints for each project to ensure all tables are either on-premises or in the cloud
for project in project_tables:
project_var = project_vars[project]
for j in project_tables[project]:
# Note that we do not need to worry about replicated tables because the logic is that
# If a table is replicated, it always satisfies such constraint
# where the project locates, this table should be available
self.model.addGenConstrIndicator(project_var, 0, z[j] == 0, name=f'z_project_{project}_{j}')
self.model.addGenConstrIndicator(project_var, 1, w[j] == 0, name=f'w_project_{project}_{j}')
self.y, self.z, self.w, self.u, self.v = (y, z, w, u, v)
def update_y_z_w_u_v(self, N, M, binary=True, log_dir=None):
os.makedirs(log_dir, exist_ok=True)
file = open(os.path.join(log_dir, "log.txt"), 'a')
old_N = len(self.y) if hasattr(self, 'y') else 0
old_M = len(self.z) if hasattr(self, 'z') else 0
print("old N, M", old_N, old_M)
start = time.time()
print("init y, z, w", file=file, flush=True)
self.model.remove(self.y)
self.y = self.model.addVars(N, vtype=GRB.BINARY, name='y')
# remove replication constr because db_table can change
self.model.remove(self.rep_constr)
self.rep_constr = []
self.model.remove(self.z)
self.model.remove(self.w)
self.z = self.model.addVars(M, vtype=GRB.BINARY, name='z')
self.w = self.model.addVars(M, vtype=GRB.BINARY, name='w')
for j in range(M):
self.model.addConstr(self.z[j] + self.w[j] <= 1, name=f'zw_{j}')
if self.rep_threshold is not None:
for key in self.rep_list:
# if self.unique_db_tables[key] is not None:
if key in self.unique_db_tables:
j = self.unique_db_tables[key]
self.rep_constr.append(self.model.addConstr(self.z[j] == 0, name=f'z_{j}_0'))
self.rep_constr.append(self.model.addConstr(self.w[j] == 0, name=f'w_{j}_0'))
else:
print("Warning: replicated table not found in unique_db_tables", key)
print_time(start, time.time(), "y, z, w created", file=file)
start = time.time()
print("If sample rate < 1, progress is usually under-estimated, nothing wrong happened")
print("init u, v", file=file, flush=True)
# Recover u, v complete
self.model.remove(self.u)
self.model.remove(self.v)
self.u = {}
self.v = {}
replicated_indices = [self.unique_db_tables[key] for key in self.rep_list if key in self.unique_db_tables]
for j in range(M):
if j in replicated_indices:
continue
job_ids = self.adj_list_input[j].keys()
for i in job_ids:
self.u[(i, j)] = self.model.addVar(vtype=GRB.BINARY, name=f'u_{i}_{j}')
self.v[(i, j)] = self.model.addVar(vtype=GRB.BINARY, name=f'v_{i}_{j}')
print_time(start, time.time(), "u, v created", file=file)
file.close()
# Update model to integrate new variables
self.model.update()
print("updated to new N, M", len(self.y), len(self.z), "=", len(self.w))
print("updated to new U, V", len(self.u), len(self.v))
def add_workload_constr(self, f_print=sys.stdout):
N = self.abFP_num
M = self.dataset_num
y, z, w, u, v = self.get_y_z_w_u_v()
constrs = []
""" zw[j] = z[j] * w[j] always = 0 """
""" u[i, j] = (1-y[i])*(z[j]-z[j]*w[j)) = (1-y[i])*z[j]
u[i, j] = 1 only when y[i] == 0 (job on-prem) and z[j] == 1 (z represents the state of data on-prem, 1 means not in on-prem)
This should be egress for input data (or say read), ingress for output data (or say write)
"""
pair_num = len(self.df)
temp_counter = 0
step = pair_num // 10
start = time.time()
print("u constr start", file=f_print, flush=True)
replicated_indices = [self.unique_db_tables[key] for key in self.rep_list if key in self.unique_db_tables]
for j in range(M):
if j in replicated_indices:
continue
job_ids = self.adj_list_input[j].keys()
for i in job_ids:
temp_counter += 1
constrs.append(self.model.addConstr(u[(i, j)] + y[i] <= 1, name=f'u1_{i}_{j}'))
constrs.append(self.model.addConstr(u[(i, j)] - z[j] <= 0, name=f'u2_{i}_{j}'))
constrs.append(self.model.addConstr(- y[i] + z[j] - u[(i, j)] <= 0, name=f'u3_{i}_{j}'))
if step != 0 and temp_counter % step == 0:
print(f"== progress:{temp_counter / pair_num * 100:.0f}%", file=f_print, flush=True)
print_time(start, time.time(), "u constr created", file=f_print)
""" v[i, j] = y[i] * (w[j] - z[j] * w[j]) = y[i] * w[j]
v[i, j] = 1 only when y[i] == 1 (job in cloud) and w[j] == 1 (w represents the state of data in cloud, 1 means not in cloud)
This should be ingress for input data (or say read), egress for output data (or say write)
"""
temp_counter = 0
start = time.time()
print("v constr start", file=f_print, flush=True)
for j in range(M):
if j in replicated_indices:
continue
job_ids = self.adj_list_input[j].keys()
for i in job_ids:
# for i in range(N):
# if self.input_matrix_gb[i, j] > 0 or self.output_matrix_gb[i, j] > 0:
temp_counter += 1
constrs.append(self.model.addConstr(v[(i, j)] - y[i] <= 0, name=f'v1_{i}_{j}'))
constrs.append(self.model.addConstr(v[(i, j)] - w[j] <= 0, name=f'v2_{i}_{j}'))
constrs.append(self.model.addConstr(y[i] + w[j] - v[(i, j)] <= 1, name=f'v3_{i}_{j}'))
if step != 0 and temp_counter % step == 0:
print(f"== progress:{temp_counter / pair_num * 100:.0f}%", file=f_print, flush=True)
print_time(start, time.time(), "v constr created", file=f_print)
return constrs
def get_y_z_w_u_v(self):
return self.y, self.z, self.w, self.u, self.v
def update_previous_placement(self, file_path):
self.previous_placement_path = file_path
# header: table,z,w,size
prev_placement_df = pd.read_csv(file_path)
prev_placement_df['table'] = prev_placement_df['table'].astype(str)
prev_placement_df, _ = merge_similar_rows(prev_placement_df, self.yugong)
self.prev_z = len(self.unique_db_tables) * [-1]
self.prev_w = len(self.unique_db_tables) * [-1]
for idx, row in prev_placement_df.iterrows():
db_table = row['table']
j = self.unique_db_tables[db_table]
assert j is not None and j < len(
prev_placement_df), f"idx {idx}, {db_table}: j={j} >= {len(prev_placement_df)}"
self.prev_z[j] = int(row['z'])
self.prev_w[j] = int(row['w'])
db_tables_list = list(self.unique_db_tables.keys())
count = 0
print("New tables", len(self.unique_db_tables) - len(self.prev_z))
"""
if self.yugong:
self.df_table_size['project'] = self.df_table_size.apply(
lambda row: self.ownership.get_table_ownership(f"{row['hive_database_name']}.{row['hive_table_name']}"), axis=1)
missing_sizes = self.df_table_size[self.df_table_size.apply(
lambda row: f"{row['hive_database_name']}.{row['hive_table_name']}" in extra_db_tables, axis=1)]
grouped_sizes = missing_sizes.groupby('project')['dir_size'].sum()
group_num = len(grouped_sizes)
print("# of grouped untouched projects this time period", group_num, flush=True, end=' ')
for project in grouped_sizes.index:
j_string = f"{project}.group"
self.ownership.add_table_ownership(j_string, "Root|" + project)
assert self.ownership.get_table_ownership(j_string) == project, f"Ownership not set for {j_string}"
"""
for j in range(len(prev_placement_df), len(self.unique_db_tables)):
key = db_tables_list[j]
db_name, table_name = key.split('.')
if self.yugong:
project = self.ownership.get_table_ownership(key)
db_group = f"{project}.group"
else:
db_group = f"{db_name}.group"
if db_group in self.unique_db_tables:
idx = self.unique_db_tables[db_group]
self.prev_z[j] = self.prev_z[idx]
self.prev_w[j] = self.prev_w[idx]
count += 1
else:
#print(f"Warning: {db_group} not found")
self.prev_z[j] = 0
self.prev_w[j] = 0
# debug
for i in range(len(self.prev_z)):
if self.prev_z[i] == -1:
print("Warning: prev_z not updated", i, end=' ')
for key in self.unique_db_tables:
if self.unique_db_tables[key] == i:
print(key)
print("Updated previous placement", count, "times from grouped dbs")
def solve_gurobi(self, p_egress_gb, p2_gb, r_min, r_max, X, dir_path,
s_min=0.0, s_max=1.0, binary=True,
time_limit=60 * 60, alpha=1,
p_network_gb=0
):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
self.model.setParam("LogFile", dir_path + "/gurobi.log")
f_print = open(os.path.join(dir_path, 'log.txt'), 'w')
print("----------------------------------------", file=f_print, flush=True)
print("Inputs: p1, p2, p3, c_min, c_max, X, s_min, s_max, binary", file=f_print, flush=True)
print(p_egress_gb, p2_gb, p_network_gb, r_min, r_max, X, s_min, s_max, binary, file=f_print, flush=True)
print(f"rep_threshold {self.rep_threshold} ({human_readable_size(self.rep_threshold * self.total_storage_gb * 1024**3)})", file=f_print, flush=True)
print("previous placement", self.previous_placement_path, file=f_print, flush=True)
print("workload info", self.workload_print_info, file=f_print, flush=True)
print("db_table_size path", self.db_table_size_path, file=f_print, flush=True)
print(f"k={self.k:.3f}", file=f_print, flush=True)
print("X", int(X), "updated to", int(X * self.X_scale), f"({self.X_scale:.3f})", file=f_print, flush=True)
X = X * self.X_scale
print("YuGong", self.yugong, file=f_print, flush=True)
print("----------------------------------------", file=f_print, flush=True)
self.model.setParam(GRB.Param.TimeLimit, time_limit)
# Define the decision variables
N = self.abFP_num
M = self.dataset_num
print("N, M", N, M, file=f_print, flush=True)
y, z, w, u, v = self.get_y_z_w_u_v()
var_constr = []
# Constraints for computation
total_computation = sum(self.c)
start = time.time()
comp_expr1 = LinExpr()
for i in range(N):
comp_expr1.add(y[i], self.c.iloc[i])
self.model.addConstr(-comp_expr1 <= -r_min * total_computation, name='comp1')
self.model.addConstr(comp_expr1 <= r_max * total_computation, name='comp2')
print_time(start, time.time(), "r constraints created", file=f_print)
# Constraints for local storage
z_var_constr = []
if s_max < 1:
# total_storage = sum(self.s)
start = time.time()
comp_expr3 = LinExpr()
for j in range(M):
comp_expr3.add(z[j], self.s[j])
z_var_constr.append(
self.model.addConstr(-comp_expr3 <= -(1 - s_max) * self.total_storage_gb, name='local1'))
print_time(start, time.time(), f"local storage constraint created: <="
f" {human_readable_size(s_max * self.total_storage_gb * 1024 ** 3)}"
f" ({s_max:.2f} * {human_readable_size(self.total_storage_gb * 1024 ** 3)})",
file=f_print)
else:
for j in range(M):
z_var_constr.append(
self.model.addConstr(z[j] == 0, name=f'z_{j}')) # z[j] = 0 since D1 local storage since no limit
print("s_max >= 1, no local storage constraint", file=f_print, flush=True)
if s_min > 0.0:
start = time.time()
comp_expr4 = LinExpr()
for j in range(M):
comp_expr4.add(z[j], self.s[j])
self.model.addConstr(comp_expr4 <= (1 - s_min) * self.total_storage_gb, name='local2')
print_time(start, time.time(), f"local storage constraint created: >="
f" {human_readable_size(s_min * self.total_storage_gb * 1024 ** 3)}"
f" ({s_min:.2f} * {human_readable_size(self.total_storage_gb * 1024 ** 3)})",
file=f_print)
start = time.time()
# This is network usage constraint, so include every piece of ingress and egress
# that comes from query execution
# data movement suggested by Moirai is not included here
comp_expr2 = LinExpr()
replicated_indices = [self.unique_db_tables[key] for key in self.rep_list if key in self.unique_db_tables]
for j in range(M):
if j in replicated_indices:
continue
job_input = self.adj_list_input[j]
job_output = self.adj_list_output[j]
for i, input_size, output_size in zip(job_input.keys(), job_input.values(), job_output.values()):
if input_size > 0:
comp_expr2.add(v[(i, j)], input_size) # ingress
comp_expr2.add(u[(i, j)], input_size) # egress
if output_size > 0:
comp_expr2.add(v[(i, j)], output_size) # egress
comp_expr2.add(u[(i, j)], output_size) # ingress
self.model.addConstr(comp_expr2 <= X, name='comp3') # bandwidth
print_time(start, time.time(), "X constraint created", file=f_print)
# Set objective function
start = time.time()
obj_expr = LinExpr()
for j in range(M):
obj_expr.add(p2_gb * self.s[j] * (1 - w[j])) # cloud object storage cost
if self.prev_z is not None: # data movement
assert self.prev_w is not None
if alpha > 1:
print("alpha > 1, data movement is disallowed", file=f_print, flush=True)
# existing cloud only dataset should not be removed
var_constr.append(
self.model.addConstr(sum(w[j] if self.prev_w[j] == 0 else 0 for j in range(M)) == 0, name='w_sum'))
# or copied back to onprem
var_constr.append(
self.model.addConstr(sum(1 - z[j]
if self.prev_w[j] == 0 and self.prev_z[j] == 1 else 0
for j in range(M)) == 0, name='z_sum'))
# TODO: this might incur extra replication from onprem to cloud, or more remote data access
else:
print("alpha <= 1, data movement is allowed, but penalized", file=f_print, flush=True)
print("TODO: fake ingress price", p_egress_gb, file=f_print, flush=True)
for j in range(len(self.prev_z)):
if self.prev_z[j] > 0:
obj_expr.add(p_egress_gb * self.s[j] * (self.prev_z[j] - z[j]) * alpha) # egress cost
if self.prev_w[j] > 0:
obj_expr.add(p_egress_gb * self.s[j] * (self.prev_w[j] - w[j]) * alpha) # ingress cost (fake)
""" Deprecated, but might be useful in the future"""
# print("Ingress price", p_network_gb, file=f_print, flush=True)
# for j in range(len(self.prev_z)):
# if self.prev_z[j] > 0:
# obj_expr.add((p_egress_gb+p_network_gb) * self.s[j] * (self.prev_z[j] - z[j]) * alpha) # egress cost
# if self.prev_w[j] > 0:
# obj_expr.add(p_network_gb * self.s[j] * (self.prev_w[j] - w[j]) * alpha) # ingress cost (fake)
for j in range(M):
if j in replicated_indices:
continue
job_input = self.adj_list_input[j]
job_output = self.adj_list_output[j]
for i, input_size, output_size in zip(job_input.keys(), job_input.values(), job_output.values()):
if input_size > 0:
obj_expr.add(u[(i, j)], (p_egress_gb+p_network_gb) * input_size) # egress
obj_expr.add(v[(i, j)], p_network_gb * input_size) # ingress
if output_size > 0:
obj_expr.add(u[(i, j)], p_network_gb * output_size) # ingress
obj_expr.add(v[(i, j)], (p_egress_gb+p_network_gb) * output_size) # egress
# for i in range(N):
# if self.input_matrix_gb[i, j] > 0:
# obj_expr.add(u[(i, j)], (p_egress_gb+p_network_gb) * self.input_matrix_gb[i, j])
# obj_expr.add(v[(i, j)], p_network_gb * self.input_matrix_gb[i, j])
# if self.output_matrix_gb[i, j] > 0:
# obj_expr.add(u[(i, j)], p_network_gb * self.output_matrix_gb[i, j])
# obj_expr.add(v[(i, j)], (p_egress_gb+p_network_gb) * self.output_matrix_gb[i, j])
self.model.setObjective(obj_expr, GRB.MINIMIZE)
print_time(start, time.time(), "obj created", file=f_print)
self.model.update()
print('----------------------------------------', file=f_print, flush=True)
# Optimize
start = time.time()
self.model.optimize()
print_time(start, time.time(), "model solved", file=f_print)
# Print solution
print("model status", self.model.status, file=f_print, flush=True)
if self.model.status == GRB.OPTIMAL or self.model.status == GRB.TIME_LIMIT or self.model.status == GRB.INTERRUPTED:
if self.model.status == GRB.OPTIMAL:
print('Optimal solution found', file=f_print, flush=True)
else:
print('Suboptimal solution found', file=f_print, flush=True)
print('Optimal co$t: %g' % self.model.objVal, end=' ', file=f_print, flush=True)
print(f'({self.model.Runtime:.2f} seconds', end=' ', file=f_print, flush=True)
# Print number of iterations
iterations = self.model.IterCount
print(f"in {iterations} iterations)", file=f_print, flush=True)
# Print number of constraints
num_constraints = len(self.model.getConstrs())
print(f"Under # constraints: {num_constraints}", file=f_print, flush=True)
print(f"Computation: {sum(y[i].x * self.c.iloc[i] for i in range(N)):.0f}", end=' ', file=f_print,
flush=True)
# Calculate and print target computation (assuming you have a variable or method to get 'total_computation')
min_target = r_min * total_computation
max_target = r_max * total_computation
print(f"∈ [{min_target:.0f} ({r_min}), "
f"{max_target:.0f} ({r_max})]", file=f_print, flush=True)
local_storage = sum((1 - z[j].x) * self.s[j] for j in range(M))
remote_storage = sum((1 - w[j].x) * self.s[j] for j in range(M))
replication = sum((1 - z[j].x - w[j].x) * self.s[j] for j in range(M))
total = local_storage + remote_storage - replication
print(f"Storage: {human_readable_size(total * 1024 ** 3)} total "
# f"== {human_readable_size(self.total_storage_gb * 1024 ** 3)} total from the dataset file, "
f"{human_readable_size(local_storage * 1024 ** 3)} on-prem "
f" ∈ [{human_readable_size(s_min * total * 1024 ** 3)} ({s_min}), "
f"{human_readable_size(s_max * total * 1024 ** 3)} ({s_max})], "
f"{human_readable_size(remote_storage * 1024 ** 3)} GCP "
f"($ {remote_storage * p2_gb:.0f} cost), "
f"{human_readable_size(replication * 1024 ** 3)} overlap", file=f_print, flush=True)
ingress_gb, egress_gb = 0, 0
for j in range(M):
if j in replicated_indices:
continue
job_input = self.adj_list_input[j]
job_output = self.adj_list_output[j]
for i, input_size, output_size in zip(job_input.keys(), job_input.values(), job_output.values()):
if v[(i, j)].x > 0:
ingress_gb += input_size
egress_gb += output_size
for j in range(M):
if j in replicated_indices:
continue
job_input = self.adj_list_input[j]
job_output = self.adj_list_output[j]
for i, input_size, output_size in zip(job_input.keys(), job_input.values(), job_output.values()):
if u[(i, j)].x > 0:
egress_gb += input_size
ingress_gb += output_size
print(f"Ingress {human_readable_size(ingress_gb * 1024 ** 3)} "
f"< {human_readable_size(X * 1024 ** 3)}", file=f_print, flush=True)
print(f'Egress {human_readable_size(egress_gb * 1024 ** 3)} '
f'{egress_gb * p_egress_gb:.2f} $', file=f_print, flush=True)
print(f"Network {human_readable_size((ingress_gb + egress_gb) * 1024 ** 3)} "
f"{(ingress_gb + egress_gb) * p_network_gb:.2f} $ (Estimated)", file=f_print, flush=True)
# consider data movement
if self.prev_z is not None:
assert self.prev_w is not None
egress_movement = 0
for j in range(len(self.prev_z)):
if self.prev_z[j] > 0:
egress_movement += (self.prev_z[j] - z[j].x) * self.s[j]
# debug
if self.prev_z[j] - z[j].x < 0:
print(f"Warning: egress {j} {self.prev_z[j]} - {z[j].x} * {self.s[j]} = "
f"{human_readable_size(self.prev_z[j] - z[j].x) * self.s[j] * 1024 ** 3}",
file=f_print, flush=True)
# egress_movement = sum((self.prev_z[j] - z[j].x) * self.s[j]
# if self.prev_z[j] > 0 else 0
# for j in range(len(self.prev_z)))
ingress_movement = sum((self.prev_w[j] - w[j].x) * self.s[j]
if self.prev_w[j] > 0 else 0
for j in range(len(self.prev_w)))
# debug, print the ones that are moved
# for table in self.unique_db_tables:
# j = self.unique_db_tables[table]
# if self.prev_w[j] > 0 and w[j].x == 0:
# print(f"Warning: ingress {table} {j} {self.prev_w[j]} - {w[j].x} * {self.s[j]} = "
# f"{human_readable_size((self.prev_w[j] - w[j].x) * self.s[j] * 1024 ** 3)}",
# file=f_print, flush=True)
print(f"Data movement: {human_readable_size(ingress_movement * 1024 ** 3)} ingress, "
f"{human_readable_size(egress_movement * 1024 ** 3)} "
f"{egress_movement * p_egress_gb:.2f}$ egress, "
f"{human_readable_size((ingress_movement + egress_movement) * 1024 ** 3)} network "
f"{(ingress_movement + egress_movement) * p_network_gb:.2f}$ (Estimated)", file=f_print, flush=True)
# dump z, w, y to file
with open(dir_path + '/dataset_placement.csv', 'w') as f:
# header
f.write('table,z,w,size\n')
for key in self.unique_db_tables:
j = self.unique_db_tables[key]
f.write(f'{key},{z[j].x},{w[j].x},{self.s[j]}\n')
with open(dir_path + '/query_placement.csv', 'w') as f:
# header
f.write('abFP,y\n')
for key in self.unique_abFP:
f.write(f'{key},{y[self.unique_abFP[key]].x}\n')
f_print.close()
# clean up contraints
print("before clean up", len(self.model.getConstrs()))
self.model.remove(var_constr)
self.model.remove(self.model.getConstrByName('comp1'))
self.model.remove(self.model.getConstrByName('comp2'))
self.model.remove(self.model.getConstrByName('comp3'))
self.model.remove(z_var_constr)
if self.model.getConstrByName('local2') is not None:
self.model.remove(self.model.getConstrByName('local2'))
self.model.update()
print("after clean up", len(self.model.getConstrs()))
print("\n")