in optimizer.py [0:0]
def solve_gurobi(self, p_egress_gb, p2_gb, r_min, r_max, X, dir_path,
s_min=0.0, s_max=1.0, binary=True,
time_limit=60 * 60, alpha=1,
p_network_gb=0
):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
self.model.setParam("LogFile", dir_path + "/gurobi.log")
f_print = open(os.path.join(dir_path, 'log.txt'), 'w')
print("----------------------------------------", file=f_print, flush=True)
print("Inputs: p1, p2, p3, c_min, c_max, X, s_min, s_max, binary", file=f_print, flush=True)
print(p_egress_gb, p2_gb, p_network_gb, r_min, r_max, X, s_min, s_max, binary, file=f_print, flush=True)
print(f"rep_threshold {self.rep_threshold} ({human_readable_size(self.rep_threshold * self.total_storage_gb * 1024**3)})", file=f_print, flush=True)
print("previous placement", self.previous_placement_path, file=f_print, flush=True)
print("workload info", self.workload_print_info, file=f_print, flush=True)
print("db_table_size path", self.db_table_size_path, file=f_print, flush=True)
print(f"k={self.k:.3f}", file=f_print, flush=True)
print("X", int(X), "updated to", int(X * self.X_scale), f"({self.X_scale:.3f})", file=f_print, flush=True)
X = X * self.X_scale
print("YuGong", self.yugong, file=f_print, flush=True)
print("----------------------------------------", file=f_print, flush=True)
self.model.setParam(GRB.Param.TimeLimit, time_limit)
# Define the decision variables
N = self.abFP_num
M = self.dataset_num
print("N, M", N, M, file=f_print, flush=True)
y, z, w, u, v = self.get_y_z_w_u_v()
var_constr = []
# Constraints for computation
total_computation = sum(self.c)
start = time.time()
comp_expr1 = LinExpr()
for i in range(N):
comp_expr1.add(y[i], self.c.iloc[i])
self.model.addConstr(-comp_expr1 <= -r_min * total_computation, name='comp1')
self.model.addConstr(comp_expr1 <= r_max * total_computation, name='comp2')
print_time(start, time.time(), "r constraints created", file=f_print)
# Constraints for local storage
z_var_constr = []
if s_max < 1:
# total_storage = sum(self.s)
start = time.time()
comp_expr3 = LinExpr()
for j in range(M):
comp_expr3.add(z[j], self.s[j])
z_var_constr.append(
self.model.addConstr(-comp_expr3 <= -(1 - s_max) * self.total_storage_gb, name='local1'))
print_time(start, time.time(), f"local storage constraint created: <="
f" {human_readable_size(s_max * self.total_storage_gb * 1024 ** 3)}"
f" ({s_max:.2f} * {human_readable_size(self.total_storage_gb * 1024 ** 3)})",
file=f_print)
else:
for j in range(M):
z_var_constr.append(
self.model.addConstr(z[j] == 0, name=f'z_{j}')) # z[j] = 0 since D1 local storage since no limit
print("s_max >= 1, no local storage constraint", file=f_print, flush=True)
if s_min > 0.0:
start = time.time()
comp_expr4 = LinExpr()
for j in range(M):
comp_expr4.add(z[j], self.s[j])
self.model.addConstr(comp_expr4 <= (1 - s_min) * self.total_storage_gb, name='local2')
print_time(start, time.time(), f"local storage constraint created: >="
f" {human_readable_size(s_min * self.total_storage_gb * 1024 ** 3)}"
f" ({s_min:.2f} * {human_readable_size(self.total_storage_gb * 1024 ** 3)})",
file=f_print)
start = time.time()
# This is network usage constraint, so include every piece of ingress and egress
# that comes from query execution
# data movement suggested by Moirai is not included here
comp_expr2 = LinExpr()
replicated_indices = [self.unique_db_tables[key] for key in self.rep_list if key in self.unique_db_tables]
for j in range(M):
if j in replicated_indices:
continue
job_input = self.adj_list_input[j]
job_output = self.adj_list_output[j]
for i, input_size, output_size in zip(job_input.keys(), job_input.values(), job_output.values()):
if input_size > 0:
comp_expr2.add(v[(i, j)], input_size) # ingress
comp_expr2.add(u[(i, j)], input_size) # egress
if output_size > 0:
comp_expr2.add(v[(i, j)], output_size) # egress
comp_expr2.add(u[(i, j)], output_size) # ingress
self.model.addConstr(comp_expr2 <= X, name='comp3') # bandwidth
print_time(start, time.time(), "X constraint created", file=f_print)
# Set objective function
start = time.time()
obj_expr = LinExpr()
for j in range(M):
obj_expr.add(p2_gb * self.s[j] * (1 - w[j])) # cloud object storage cost
if self.prev_z is not None: # data movement
assert self.prev_w is not None
if alpha > 1:
print("alpha > 1, data movement is disallowed", file=f_print, flush=True)
# existing cloud only dataset should not be removed
var_constr.append(
self.model.addConstr(sum(w[j] if self.prev_w[j] == 0 else 0 for j in range(M)) == 0, name='w_sum'))
# or copied back to onprem
var_constr.append(
self.model.addConstr(sum(1 - z[j]
if self.prev_w[j] == 0 and self.prev_z[j] == 1 else 0
for j in range(M)) == 0, name='z_sum'))
# TODO: this might incur extra replication from onprem to cloud, or more remote data access
else:
print("alpha <= 1, data movement is allowed, but penalized", file=f_print, flush=True)
print("TODO: fake ingress price", p_egress_gb, file=f_print, flush=True)
for j in range(len(self.prev_z)):
if self.prev_z[j] > 0:
obj_expr.add(p_egress_gb * self.s[j] * (self.prev_z[j] - z[j]) * alpha) # egress cost
if self.prev_w[j] > 0:
obj_expr.add(p_egress_gb * self.s[j] * (self.prev_w[j] - w[j]) * alpha) # ingress cost (fake)
""" Deprecated, but might be useful in the future"""
# print("Ingress price", p_network_gb, file=f_print, flush=True)
# for j in range(len(self.prev_z)):
# if self.prev_z[j] > 0:
# obj_expr.add((p_egress_gb+p_network_gb) * self.s[j] * (self.prev_z[j] - z[j]) * alpha) # egress cost
# if self.prev_w[j] > 0:
# obj_expr.add(p_network_gb * self.s[j] * (self.prev_w[j] - w[j]) * alpha) # ingress cost (fake)
for j in range(M):
if j in replicated_indices:
continue
job_input = self.adj_list_input[j]
job_output = self.adj_list_output[j]
for i, input_size, output_size in zip(job_input.keys(), job_input.values(), job_output.values()):
if input_size > 0:
obj_expr.add(u[(i, j)], (p_egress_gb+p_network_gb) * input_size) # egress
obj_expr.add(v[(i, j)], p_network_gb * input_size) # ingress
if output_size > 0:
obj_expr.add(u[(i, j)], p_network_gb * output_size) # ingress
obj_expr.add(v[(i, j)], (p_egress_gb+p_network_gb) * output_size) # egress
# for i in range(N):
# if self.input_matrix_gb[i, j] > 0:
# obj_expr.add(u[(i, j)], (p_egress_gb+p_network_gb) * self.input_matrix_gb[i, j])
# obj_expr.add(v[(i, j)], p_network_gb * self.input_matrix_gb[i, j])
# if self.output_matrix_gb[i, j] > 0:
# obj_expr.add(u[(i, j)], p_network_gb * self.output_matrix_gb[i, j])
# obj_expr.add(v[(i, j)], (p_egress_gb+p_network_gb) * self.output_matrix_gb[i, j])
self.model.setObjective(obj_expr, GRB.MINIMIZE)
print_time(start, time.time(), "obj created", file=f_print)
self.model.update()
print('----------------------------------------', file=f_print, flush=True)
# Optimize
start = time.time()
self.model.optimize()
print_time(start, time.time(), "model solved", file=f_print)
# Print solution
print("model status", self.model.status, file=f_print, flush=True)
if self.model.status == GRB.OPTIMAL or self.model.status == GRB.TIME_LIMIT or self.model.status == GRB.INTERRUPTED:
if self.model.status == GRB.OPTIMAL:
print('Optimal solution found', file=f_print, flush=True)
else:
print('Suboptimal solution found', file=f_print, flush=True)
print('Optimal co$t: %g' % self.model.objVal, end=' ', file=f_print, flush=True)
print(f'({self.model.Runtime:.2f} seconds', end=' ', file=f_print, flush=True)
# Print number of iterations
iterations = self.model.IterCount
print(f"in {iterations} iterations)", file=f_print, flush=True)
# Print number of constraints
num_constraints = len(self.model.getConstrs())
print(f"Under # constraints: {num_constraints}", file=f_print, flush=True)
print(f"Computation: {sum(y[i].x * self.c.iloc[i] for i in range(N)):.0f}", end=' ', file=f_print,
flush=True)
# Calculate and print target computation (assuming you have a variable or method to get 'total_computation')
min_target = r_min * total_computation
max_target = r_max * total_computation
print(f"∈ [{min_target:.0f} ({r_min}), "
f"{max_target:.0f} ({r_max})]", file=f_print, flush=True)
local_storage = sum((1 - z[j].x) * self.s[j] for j in range(M))
remote_storage = sum((1 - w[j].x) * self.s[j] for j in range(M))
replication = sum((1 - z[j].x - w[j].x) * self.s[j] for j in range(M))
total = local_storage + remote_storage - replication
print(f"Storage: {human_readable_size(total * 1024 ** 3)} total "
# f"== {human_readable_size(self.total_storage_gb * 1024 ** 3)} total from the dataset file, "
f"{human_readable_size(local_storage * 1024 ** 3)} on-prem "
f" ∈ [{human_readable_size(s_min * total * 1024 ** 3)} ({s_min}), "
f"{human_readable_size(s_max * total * 1024 ** 3)} ({s_max})], "
f"{human_readable_size(remote_storage * 1024 ** 3)} GCP "
f"($ {remote_storage * p2_gb:.0f} cost), "
f"{human_readable_size(replication * 1024 ** 3)} overlap", file=f_print, flush=True)
ingress_gb, egress_gb = 0, 0
for j in range(M):
if j in replicated_indices:
continue
job_input = self.adj_list_input[j]
job_output = self.adj_list_output[j]
for i, input_size, output_size in zip(job_input.keys(), job_input.values(), job_output.values()):
if v[(i, j)].x > 0:
ingress_gb += input_size
egress_gb += output_size
for j in range(M):
if j in replicated_indices:
continue
job_input = self.adj_list_input[j]
job_output = self.adj_list_output[j]
for i, input_size, output_size in zip(job_input.keys(), job_input.values(), job_output.values()):
if u[(i, j)].x > 0:
egress_gb += input_size
ingress_gb += output_size
print(f"Ingress {human_readable_size(ingress_gb * 1024 ** 3)} "
f"< {human_readable_size(X * 1024 ** 3)}", file=f_print, flush=True)
print(f'Egress {human_readable_size(egress_gb * 1024 ** 3)} '
f'{egress_gb * p_egress_gb:.2f} $', file=f_print, flush=True)
print(f"Network {human_readable_size((ingress_gb + egress_gb) * 1024 ** 3)} "
f"{(ingress_gb + egress_gb) * p_network_gb:.2f} $ (Estimated)", file=f_print, flush=True)
# consider data movement
if self.prev_z is not None:
assert self.prev_w is not None
egress_movement = 0
for j in range(len(self.prev_z)):
if self.prev_z[j] > 0:
egress_movement += (self.prev_z[j] - z[j].x) * self.s[j]
# debug
if self.prev_z[j] - z[j].x < 0:
print(f"Warning: egress {j} {self.prev_z[j]} - {z[j].x} * {self.s[j]} = "
f"{human_readable_size(self.prev_z[j] - z[j].x) * self.s[j] * 1024 ** 3}",
file=f_print, flush=True)
# egress_movement = sum((self.prev_z[j] - z[j].x) * self.s[j]
# if self.prev_z[j] > 0 else 0
# for j in range(len(self.prev_z)))
ingress_movement = sum((self.prev_w[j] - w[j].x) * self.s[j]
if self.prev_w[j] > 0 else 0
for j in range(len(self.prev_w)))
# debug, print the ones that are moved
# for table in self.unique_db_tables:
# j = self.unique_db_tables[table]
# if self.prev_w[j] > 0 and w[j].x == 0:
# print(f"Warning: ingress {table} {j} {self.prev_w[j]} - {w[j].x} * {self.s[j]} = "
# f"{human_readable_size((self.prev_w[j] - w[j].x) * self.s[j] * 1024 ** 3)}",
# file=f_print, flush=True)
print(f"Data movement: {human_readable_size(ingress_movement * 1024 ** 3)} ingress, "
f"{human_readable_size(egress_movement * 1024 ** 3)} "
f"{egress_movement * p_egress_gb:.2f}$ egress, "
f"{human_readable_size((ingress_movement + egress_movement) * 1024 ** 3)} network "
f"{(ingress_movement + egress_movement) * p_network_gb:.2f}$ (Estimated)", file=f_print, flush=True)
# dump z, w, y to file
with open(dir_path + '/dataset_placement.csv', 'w') as f:
# header
f.write('table,z,w,size\n')
for key in self.unique_db_tables:
j = self.unique_db_tables[key]
f.write(f'{key},{z[j].x},{w[j].x},{self.s[j]}\n')
with open(dir_path + '/query_placement.csv', 'w') as f:
# header
f.write('abFP,y\n')
for key in self.unique_abFP:
f.write(f'{key},{y[self.unique_abFP[key]].x}\n')
f_print.close()
# clean up contraints
print("before clean up", len(self.model.getConstrs()))
self.model.remove(var_constr)
self.model.remove(self.model.getConstrByName('comp1'))
self.model.remove(self.model.getConstrByName('comp2'))
self.model.remove(self.model.getConstrByName('comp3'))
self.model.remove(z_var_constr)
if self.model.getConstrByName('local2') is not None:
self.model.remove(self.model.getConstrByName('local2'))
self.model.update()
print("after clean up", len(self.model.getConstrs()))
print("\n")