in optimizer.py [0:0]
def add_y_z_w_u_v(self, N, M, binary=True, log_dir='.', file=sys.stdout):
u = {}
v = {}
pair_num = len(self.df)
temp_counter = 0
step = pair_num // 10
start = time.time()
print("init y,z,w", file=file, flush=True)
assert binary is True
# Decision variables
# y[i]: If workload i is executed in D1, y[i] = 0; if in D2, y[i] = 1.
y = self.model.addVars(N, vtype=GRB.BINARY, name='y')
# z[i]: If dataset j exists in D1, z[j] = 0; else, z[j] = 1.
z = self.model.addVars(M, vtype=GRB.BINARY, name='z')
# w[i]: If dataset j exists in D2, w[j] = 0; else, w[j] = 1.
w = self.model.addVars(M, vtype=GRB.BINARY, name='w')
print_time(start, time.time(), "y,z,w created", file=file)
if self.yugong:
print("Enforce replication on the same set of tables with Moirai")
print("rep_threshold", self.rep_threshold)
# print(f"rep_threshold={self.rep_threshold} < 0, z+w=1, no replication")
replicated_indices = set()
for table in self.rep_list:
j = self.unique_db_tables.get(table, None)
if j is None:
print(f"[Warning] Table {table} not found in unique_db_tables, skip for replication")
continue
self.rep_constr.append(self.model.addConstr(z[j] == 0, name=f'z_{j}_0'))
self.rep_constr.append(self.model.addConstr(w[j] == 0, name=f'w_{j}_0'))
replicated_indices.add(j)
# # First, replicate 0.004 of data
# threshold_gb = self.total_storage_gb * 0.004
# print(f"Replicate 0.4% data = {human_readable_size(threshold_gb * 1024 ** 3)}")
# total_size_gb, rep_count = 0, 0
# replicated_indices = set()
# for j in range(M):
# if self.s[j] > 0:
# if total_size_gb + self.s[j] >= threshold_gb + 1024: # 1TB buffer
# continue
# total_size_gb += self.s[j]
# self.rep_constr.append(self.model.addConstr(z[j] == 0, name=f'z_{j}_0'))
# self.rep_constr.append(self.model.addConstr(w[j] == 0, name=f'w_{j}_0'))
# replicated_indices.add(j)
# rep_count += 1
# for key in self.unique_db_tables:
# if j == self.unique_db_tables[key]:
# self.rep_list.append(key)
# if total_size_gb >= threshold_gb:
# print(
# f"Replicated Total {rep_count} Tables of {human_readable_size(total_size_gb * 1024 ** 3)} till pos {j}")
# print(f"# of replicated tables logged down:", len(self.rep_list))
# break
#Then enforce existance of each tables
for j in range(M):
if j not in replicated_indices:
self.model.addConstr(z[j] + w[j] <= 1, name=f'zw_{j}')
else:
if self.rep_threshold is not None and self.rep_threshold < 0:
print(f"rep_threshold={self.rep_threshold} < 0, z+w=1, no replication")
for j in range(M):
self.model.addConstr(z[j] + w[j] == 1, name=f'zw_{j}')
else:
print(f"rep_threshold={self.rep_threshold}, z+w<=1, replication allowed")
for j in range(M):
self.model.addConstr(z[j] + w[j] <= 1, name=f'zw_{j}')
if self.rep_threshold is not None and self.rep_threshold > 0:
assert len(self.rep_list) == 0
threshold_gb = self.total_storage_gb * self.rep_threshold
print(
f"replicate top {self.rep_threshold * 100:.2f}% data = {human_readable_size(threshold_gb * 1024 ** 3)}")
counter, total_size_gb, rep_count = 0, 0, 0
for j in range(M):
if self.s[j] > 0:
if total_size_gb + self.s[j] >= threshold_gb + 1024: # 1TB
continue
total_size_gb += self.s[j]
self.rep_constr.append(self.model.addConstr(z[j] == 0, name=f'z_{j}_0'))
self.rep_constr.append(self.model.addConstr(w[j] == 0, name=f'w_{j}_0'))
rep_count += 1
for key in self.unique_db_tables:
if j == self.unique_db_tables[key]:
self.rep_list.append(key)
break
if total_size_gb >= threshold_gb:
print(f"Replicate Total {rep_count} Tables of {human_readable_size(total_size_gb * 1024 ** 3)} till pos {j}")
print(f"# of replicated tables logged down:", len(self.rep_list))
with open(os.path.join(log_dir, f"replicated_tables_{self.rep_threshold}_{self.rep_strategy}.csv"), 'w') as f:
f.write("replicated_tables\n")
for key in self.rep_list:
f.write(f"{key}\n")
break
start = time.time()
print("If sample rate < 1, progress is usually under-estimated, nothing wrong happened")
print("init u, v", file=file, flush=True)
replicated_indices = [self.unique_db_tables[key] for key in self.rep_list]
for j in range(M):
if j in replicated_indices:
continue
job_ids = self.adj_list_input[j].keys()
for i in job_ids:
u[(i, j)] = self.model.addVar(vtype=GRB.BINARY, name=f'u_{i}_{j}')
v[(i, j)] = self.model.addVar(vtype=GRB.BINARY, name=f'v_{i}_{j}')
temp_counter += 1
if step != 0 and temp_counter % step == 0:
print(f"== progress:{temp_counter / pair_num * 100:.0f}%", file=file, flush=True)
print_time(start, time.time(), "u, v created", file=file)
if self.yugong:
project_list = []
# print("unique_db_tables", self.unique_db_tables, file=file, flush=True) # debug
for key in self.unique_db_tables:
table_ownership = self.ownership.get_table_ownership(key)
# assert table_ownership is not None, f"Table ownership not found for {key}"
if table_ownership not in project_list:
project_list.append(table_ownership)
for key in self.unique_abFP:
query_ownership = key
if query_ownership not in project_list:
project_list.append(query_ownership)
print("# of projects", len(project_list), file=file, flush=True)
print("project_list", project_list, file=file, flush=True) # debug
# init a dict for each project to store the jobs in the project
project_jobs = {project: [] for project in project_list}
for key in self.unique_abFP:
i = self.unique_abFP[key]
project = key
project_jobs[project].append(i)
# Add constraints for each project to ensure all jobs are either on-premises or in the cloud
project_vars = {project: self.model.addVar(vtype=GRB.BINARY, name=f'y_project_{project}') for project in project_list}
for project in project_jobs:
project_var = project_vars[project]
for i in project_jobs[project]:
self.model.addConstr(y[i] == project_var, name=f'y_project_{project}_{i}')
project_tables = {project: [] for project in project_list}
for key in self.unique_db_tables:
j = self.unique_db_tables[key]
project = self.ownership.get_table_ownership(key)
project_tables[project].append(j)
# Add constraints for each project to ensure all tables are either on-premises or in the cloud
for project in project_tables:
project_var = project_vars[project]
for j in project_tables[project]:
# Note that we do not need to worry about replicated tables because the logic is that
# If a table is replicated, it always satisfies such constraint
# where the project locates, this table should be available
self.model.addGenConstrIndicator(project_var, 0, z[j] == 0, name=f'z_project_{project}_{j}')
self.model.addGenConstrIndicator(project_var, 1, w[j] == 0, name=f'w_project_{project}_{j}')
self.y, self.z, self.w, self.u, self.v = (y, z, w, u, v)