in scheduler.py [0:0]
def place_query(self, template_id, cputime, table_volume_list,
policy='size-predict', target_cloud_cpu_ratio=None, info=None):
placement_y = self.query_map.get(template_id, None)
#if self.yugong:
#print(f"template_id: {template_id}, cputime: {cputime}, table_volume_list: {table_volume_list}, placement_y: {placement_y}")
#assert placement_y is not None, f"project name {template_id} not found in query_map"
all_tables_local, all_tables_cloud = True, True
table_zw_map = {} # group-aware, i.e., identify the location of a table even if packed in ".group"
for table, _, _ in table_volume_list:
db_name, _, table_name = table.partition('.')
group_name = self.ownership.get_table_ownership(table) if self.yugong else db_name
if table not in self.table_size_map and table not in self.dataset_map:
# can assume this table is small enough
continue
elif table not in self.dataset_map and f"{group_name}.group" not in self.dataset_map:
logging.warning(f"table {table} and {group_name}.group not found in dataset_map")
continue
elif table not in self.dataset_map:
on_prem, cloud = self.dataset_map[f"{group_name}.group"]
else:
on_prem, cloud = self.dataset_map[table]
# dataset_key = table if table in self.dataset_map else f"{group_name}.group"
# if dataset_key not in self.dataset_map:
# logging.warning(f"Table {table} and group {dataset_key} not found in dataset_map")
# continue
#
# on_prem, cloud = self.dataset_map[dataset_key]
table_zw_map[table] = (on_prem, cloud)
all_tables_local &= on_prem == 0
all_tables_cloud &= cloud == 0
#print(f"table_zw_map: {table_zw_map}", flush=True) # debug
# Compute cloud ratio
cloud_ratio = self.get_cloud_computation_ratio()
# debugging
# if policy == 'size-predict':
# placement_y = None
input_volume = sum(volume for _, volume, _ in table_volume_list)
output_volume = sum(volume for _, _, volume in table_volume_list)
# total_volume = sum(volume for _, volume in table_volume_list)
if placement_y is not None:
category = "both_sides" if all_tables_local and all_tables_cloud else \
"only_cloud" if all_tables_local else \
"only_onprem" if all_tables_local else \
"needs_transfer"
self.stat_categories[category].add(cputime=cputime, inputDataSize=input_volume, outputDataSize=output_volume)
# Adjust placement decision if needed
if not self.yugong:
if placement_y == 0 and all_tables_cloud and cloud_ratio < target_cloud_cpu_ratio:
placement_y = 1
elif placement_y == 1 and all_tables_local and cloud_ratio > target_cloud_cpu_ratio:
placement_y = 0
else: # New query classification
category = "both_sides" if all_tables_local and all_tables_cloud else \
"only_cloud" if all_tables_local else \
"only_onprem" if all_tables_local else \
"needs_transfer"
self.stat_categories_new[category].add(cputime=cputime, inputDataSize=input_volume, outputDataSize=output_volume)
if policy == "independent":
placement_y = 1 if cloud_ratio < target_cloud_cpu_ratio else 0
elif policy in ['size-predict', 'size-aware', 'size-unaware']:
if all_tables_local and all_tables_cloud:
placement_y = 1 if cloud_ratio < target_cloud_cpu_ratio else 0
elif all_tables_cloud:
placement_y = 1
elif all_tables_local:
placement_y = 0
# TODO: remove the magic number 0.05
elif cloud_ratio < target_cloud_cpu_ratio - 0.05:
placement_y = 1
elif cloud_ratio > target_cloud_cpu_ratio + 0.05:
placement_y = 0
else:
traffic_if_executed_cloud = 0
traffic_if_executed_on_prem = 0
for table, input_access, output_access in table_volume_list:
if table not in table_zw_map:
continue # work-around: this table should be small and cold
if policy == 'size-predict':
weight = self.weight_lookup.get(table, 1) # set to 1 Byte, effectively omitted this table as it should be cold
elif policy == 'size-aware':
weight = input_access+output_access
else: # size-unaware
weight = 1
if table_zw_map[table][0] == 1:
traffic_if_executed_on_prem += weight
if table_zw_map[table][1] == 1:
traffic_if_executed_cloud += weight
placement_y = 1 if traffic_if_executed_cloud < traffic_if_executed_on_prem else 0
else:
raise ValueError(f"Unknown policy: {policy}")
# Update stats
(self.stat_cloud_query if placement_y == 1 else self.stat_on_prem_query).add(cputime=cputime,
inputDataSize=input_volume,
outputDataSize=output_volume)
egress = 0
ingress = 0
if placement_y == 0: # job executed on-prem
for table, input_access, output_access in table_volume_list:
if table not in table_zw_map:
continue
if table_zw_map[table][0] == 1: # but data cannot be found on-prem, egress
egress += input_access
ingress += output_access
# if volume > 0:
# if self.yugong is False:
# logging.info(f"st {info} fp {abFP} create egress {human_readable_size(volume)} to {table}")
else: # cloud
assert placement_y == 1 # job executed on cloud
for table, input_access, output_access in table_volume_list:
if table not in table_zw_map:
continue
if table_zw_map[table][1] == 1: # but data cannot be found on cloud
ingress += input_access
egress += output_access
# if volume > 0:
# if self.yugong is False:
# logging.info(f"st {info} fp {abFP} create ingress {human_readable_size(volume)} to {table}")
return placement_y, egress, ingress