scheduler.py (781 lines of code) (raw):
import argparse
import concurrent.futures
import fcntl
import logging
import os
import re
import shutil
from collections import defaultdict, OrderedDict
from datetime import datetime, timedelta
from enum import Enum
import numpy as np
import pandas as pd
from Yugong.Ownership import Ownership
from utility import human_readable_size, to_seconds, parse_size
# Define command-line arguments
parser = argparse.ArgumentParser(description="Test optimization with upcoming jobs")
parser.add_argument("--c", type=int, default=30, help="Portion of compute to cloud")
parser.add_argument("--num_week", type=int, default=1, help="# of weeks for evaluation")
parser.add_argument("--opt_path", type=str, default="sample_1.000", help="Optimization results stored under this path")
parser.add_argument("--debug", action="store_true", help="debug mode might just run part of the traces")
parser.add_argument("--yugong", action="store_true", help="Yugong mode: project-based job placement")
parser.add_argument("--simple", action="store_true", help="omit traffic rate calculation")
parser.add_argument("--policy", type=str, default="size-predict", help="scheduling policy",
choices=["size-predict", "size-aware", "size-unaware", "independent"])
args = parser.parse_args()
class JobType(Enum):
SPARK = "spark"
PRESTO = "presto"
class Stat:
def __init__(self, help=None):
self.count = 0
self.cputime = 0
self.inputDataSize = 0
self.outputDataSize = 0
self.help = help
def add(self, cputime=0, inputDataSize=0, outputDataSize=0):
self.count += 1
self.cputime += cputime
self.inputDataSize += inputDataSize
self.outputDataSize += outputDataSize
def count(self):
return self.count
def get_cputime(self):
return self.cputime
def inputDataSize(self):
return self.inputDataSize
def outputDataSize(self):
return self.outputDataSize
def print(self):
log_str = f"{self.help}: count {self.count}"
if self.cputime > 0:
log_str += f", cputime {self.cputime:.4g}"
if self.inputDataSize > 0:
log_str += f", inputDataSize {human_readable_size(self.inputDataSize)}"
if self.outputDataSize > 0:
log_str += f", outputDataSize {human_readable_size(self.outputDataSize)}"
logging.info(log_str)
def clean(self):
self.count = 0
self.cputime = 0
self.inputDataSize = 0
self.outputDataSize = 0
class Scheduler:
def __init__(self, dir_path, table_size_path, weight_lookup=None,
yugong=False, ownership=None):
assert os.path.exists(dir_path), f"dir_path={dir_path} not found"
self.dir_path = dir_path
logging.info(f"dir_path: {dir_path}, yugong mode: {yugong}")
self.yugong = yugong
if self.yugong:
assert ownership is not None, "ownership is None"
self.ownership = ownership
# load template and dataset placements
# query_kv = self._load_query_placement()
# self.query_map = Cache(maxsize=len(query_kv) // 100, kv_store=query_kv)
# logging.info(f"# of query placements: {len(query_kv)}")
self.query_map = self._load_query_placement()
logging.info(f"# of query placements: {len(self.query_map)}")
self.dataset_map = self._load_dataset_placement()
logging.info(f"# of dataset placements: {len(self.dataset_map)}")
# load table sizes
logging.info(f"Loaded table size: {table_size_path}")
self.table_size_map = self._load_table_sizes(table_size_path)
self.size_lookup = None
self.db_table_size = None
self.weight_lookup = weight_lookup
# historical workload
self.stat_cloud_query = Stat("cloud queries")
self.stat_on_prem_query = Stat("on-prem queries")
self.stat_categories = {
"both_sides": Stat("Old queries with tables on both sides"), # all_table_local & all_table_cloud == True
"only_cloud": Stat("Old queries with all tables on cloud"), # all_table_local == False and all_table_cloud == True
"only_onprem": Stat("Old queries with all tables on-prem"), # all_table_local == True and all_table_cloud == False
"needs_transfer": Stat("Old queries requiring ingress/egress"), # all_table_local == False and all_table_cloud == False
}
self.stat_categories_new = {
"both_sides": Stat("New queries with tables on both sides"), # all_table_local & all_table_cloud == True
"only_cloud": Stat("New queries with all tables on cloud"),
# all_table_local == False and all_table_cloud == True
"only_onprem": Stat("New queries with all tables on-prem"),
# all_table_local == True and all_table_cloud == False
"needs_transfer": Stat("New queries requiring ingress/egress"),
# all_table_local == False and all_table_cloud == False
}
def get_cloud_computation_ratio(self) -> float:
total_cputime = self.stat_cloud_query.get_cputime() + self.stat_on_prem_query.get_cputime()
return self.stat_cloud_query.get_cputime() / total_cputime if total_cputime else 0.0
def stats(self):
self.stat_cloud_query.print()
self.stat_on_prem_query.print()
for stat in self.stat_categories.values():
stat.print()
for stat in self.stat_categories_new.values():
stat.print()
"""
Determine query placement based on dataset distribution and policy.
Input: table_volume_list, format: [(table_name, input_volume, output_volume), ...]
Returns: (placement_y, remote traffic)
"""
def place_query(self, template_id, cputime, table_volume_list,
policy='size-predict', target_cloud_cpu_ratio=None, info=None):
placement_y = self.query_map.get(template_id, None)
#if self.yugong:
#print(f"template_id: {template_id}, cputime: {cputime}, table_volume_list: {table_volume_list}, placement_y: {placement_y}")
#assert placement_y is not None, f"project name {template_id} not found in query_map"
all_tables_local, all_tables_cloud = True, True
table_zw_map = {} # group-aware, i.e., identify the location of a table even if packed in ".group"
for table, _, _ in table_volume_list:
db_name, _, table_name = table.partition('.')
group_name = self.ownership.get_table_ownership(table) if self.yugong else db_name
if table not in self.table_size_map and table not in self.dataset_map:
# can assume this table is small enough
continue
elif table not in self.dataset_map and f"{group_name}.group" not in self.dataset_map:
logging.warning(f"table {table} and {group_name}.group not found in dataset_map")
continue
elif table not in self.dataset_map:
on_prem, cloud = self.dataset_map[f"{group_name}.group"]
else:
on_prem, cloud = self.dataset_map[table]
# dataset_key = table if table in self.dataset_map else f"{group_name}.group"
# if dataset_key not in self.dataset_map:
# logging.warning(f"Table {table} and group {dataset_key} not found in dataset_map")
# continue
#
# on_prem, cloud = self.dataset_map[dataset_key]
table_zw_map[table] = (on_prem, cloud)
all_tables_local &= on_prem == 0
all_tables_cloud &= cloud == 0
#print(f"table_zw_map: {table_zw_map}", flush=True) # debug
# Compute cloud ratio
cloud_ratio = self.get_cloud_computation_ratio()
# debugging
# if policy == 'size-predict':
# placement_y = None
input_volume = sum(volume for _, volume, _ in table_volume_list)
output_volume = sum(volume for _, _, volume in table_volume_list)
# total_volume = sum(volume for _, volume in table_volume_list)
if placement_y is not None:
category = "both_sides" if all_tables_local and all_tables_cloud else \
"only_cloud" if all_tables_local else \
"only_onprem" if all_tables_local else \
"needs_transfer"
self.stat_categories[category].add(cputime=cputime, inputDataSize=input_volume, outputDataSize=output_volume)
# Adjust placement decision if needed
if not self.yugong:
if placement_y == 0 and all_tables_cloud and cloud_ratio < target_cloud_cpu_ratio:
placement_y = 1
elif placement_y == 1 and all_tables_local and cloud_ratio > target_cloud_cpu_ratio:
placement_y = 0
else: # New query classification
category = "both_sides" if all_tables_local and all_tables_cloud else \
"only_cloud" if all_tables_local else \
"only_onprem" if all_tables_local else \
"needs_transfer"
self.stat_categories_new[category].add(cputime=cputime, inputDataSize=input_volume, outputDataSize=output_volume)
if policy == "independent":
placement_y = 1 if cloud_ratio < target_cloud_cpu_ratio else 0
elif policy in ['size-predict', 'size-aware', 'size-unaware']:
if all_tables_local and all_tables_cloud:
placement_y = 1 if cloud_ratio < target_cloud_cpu_ratio else 0
elif all_tables_cloud:
placement_y = 1
elif all_tables_local:
placement_y = 0
# TODO: remove the magic number 0.05
elif cloud_ratio < target_cloud_cpu_ratio - 0.05:
placement_y = 1
elif cloud_ratio > target_cloud_cpu_ratio + 0.05:
placement_y = 0
else:
traffic_if_executed_cloud = 0
traffic_if_executed_on_prem = 0
for table, input_access, output_access in table_volume_list:
if table not in table_zw_map:
continue # work-around: this table should be small and cold
if policy == 'size-predict':
weight = self.weight_lookup.get(table, 1) # set to 1 Byte, effectively omitted this table as it should be cold
elif policy == 'size-aware':
weight = input_access+output_access
else: # size-unaware
weight = 1
if table_zw_map[table][0] == 1:
traffic_if_executed_on_prem += weight
if table_zw_map[table][1] == 1:
traffic_if_executed_cloud += weight
placement_y = 1 if traffic_if_executed_cloud < traffic_if_executed_on_prem else 0
else:
raise ValueError(f"Unknown policy: {policy}")
# Update stats
(self.stat_cloud_query if placement_y == 1 else self.stat_on_prem_query).add(cputime=cputime,
inputDataSize=input_volume,
outputDataSize=output_volume)
egress = 0
ingress = 0
if placement_y == 0: # job executed on-prem
for table, input_access, output_access in table_volume_list:
if table not in table_zw_map:
continue
if table_zw_map[table][0] == 1: # but data cannot be found on-prem, egress
egress += input_access
ingress += output_access
# if volume > 0:
# if self.yugong is False:
# logging.info(f"st {info} fp {abFP} create egress {human_readable_size(volume)} to {table}")
else: # cloud
assert placement_y == 1 # job executed on cloud
for table, input_access, output_access in table_volume_list:
if table not in table_zw_map:
continue
if table_zw_map[table][1] == 1: # but data cannot be found on cloud
ingress += input_access
egress += output_access
# if volume > 0:
# if self.yugong is False:
# logging.info(f"st {info} fp {abFP} create ingress {human_readable_size(volume)} to {table}")
return placement_y, egress, ingress
def _load_query_placement(self):
file_path = os.path.join(self.dir_path, 'query_placement.csv')
if not os.path.exists(file_path):
return {}
df = pd.read_csv(file_path, delimiter=',', on_bad_lines='warn')
df['abFP'] = df['abFP'].astype(str)
df['y'] = df['y'].astype(int)
return df.set_index('abFP')['y'].to_dict()
def _load_dataset_placement(self):
df = pd.read_csv(os.path.join(self.dir_path, 'dataset_placement.csv'),
dtype={'table': str, 'size': float})
df['z'] = df['z'].astype(int)
df['w'] = df['w'].astype(int)
df['table'] = df['table'].astype(str)
logging.info(
f"datalake size according to dataset_map: {human_readable_size(df['size'].sum() * 1024**3)}")
# dict_of_placement = df.set_index('table')[['z', 'w']].to_dict(orient='index')
# Ensure we return actual z and w values
dataset_map = {row['table']: (row['z'], row['w']) for _, row in df.iterrows()}
return dataset_map
def _load_table_sizes(self, table_size_path):
"""Load table sizes and filter out zero-size entries."""
df = pd.read_csv(table_size_path)
df['hive_database_name'] = df['hive_database_name'].astype(str)
df['hive_table_name'] = df['hive_table_name'].astype(str)
df = df[df['dir_size'] > 0] # Exclude tables with zero size
df['full_table_name'] = df['hive_database_name'] + '.' + df['hive_table_name']
return df.set_index('full_table_name')['dir_size'].to_dict()
def process_baseline(baseline: str, dir_path: str, num_of_week: int, c: int,
rep_rate: float,
traffic_rate_disabled: bool = False,
):
period_day = 7
policy = "size-predict"
setup_logger(os.path.join(dir_path, f'routing.txt'))
period_start = datetime.strptime("2024-10-22", "%Y-%m-%d")
logging.info(f"Preparing the first df starting from {period_start}")
# Header: start_time: str,job_id,template_id,duration,
# uown_names,inputDataSize,outputDataSize,cputime, type
df_presto = pd.concat([read_Presto(period_start + timedelta(days=i)) for i in range(period_day)])
df_spark = pd.concat([read_Spark(period_start + timedelta(days=i)) for i in range(period_day)])
df = pd.concat([df_spark, df_presto])
df['totalDataSize'] = df['inputDataSize'] + df['outputDataSize']
weight_group = df.groupby(['table']).agg(
totalDataSize=('totalDataSize', 'mean')).reset_index()
weight_lookup = weight_group.set_index('table').to_dict()['totalDataSize']
logging.info(f"# of jobs: {len(df['job_id'].unique())}")
period_start = period_start + timedelta(days=period_day)
""" to calculate traffic rate per minute, """
minute_buckets = OrderedDict() # OrderedDict keeps minute order for easy popping
# store logs for each period
period_logs = []
for period_offset in range(num_of_week):
start_date = period_start + timedelta(days=period_offset * period_day)
df_presto = pd.concat([read_Presto(start_date + timedelta(days=i)) for i in range(period_day)])
df_spark = pd.concat([read_Spark(start_date + timedelta(days=i)) for i in range(period_day)])
df = pd.concat([df_spark, df_presto])
df['totalDataSize'] = df['inputDataSize'] + df['outputDataSize']
df = df.sort_values(['start_time', 'job_id'])
logging.info(f"Week {period_offset + 1}, starting on {start_date}")
logging.info(f"# of jobs: {len(df['job_id'].unique())}")
jobs = df.groupby(['start_time', 'job_id'])
scheduler = Scheduler(dir_path=os.path.join(dir_path),
table_size_path='report-table-size-20241021.csv',
weight_lookup=weight_lookup) # TODO: stateful between periods
egress_byte_Presto = 0
ingress_byte_Presto = 0
egress_byte_Spark = 0
ingress_byte_Spark = 0
# enumerate jobs
for (start_time, job_id), group in jobs:
job_type = group['type'].iloc[0]
if job_type == JobType.SPARK:
cputime = group['cputime'].iloc[0]
else:
cputime = group['cputime'].sum()
template_id = group['template_id'].iloc[0]
table_volume_list = [(row['table'], row['inputDataSize'], row['outputDataSize']) for _, row in
group.iterrows()]
placement_y, egress_byte, ingress_byte = scheduler.place_query(template_id, cputime, table_volume_list,
policy=policy,
target_cloud_cpu_ratio=c / 100,
info=start_time)
if baseline == "rep_x_month":
egress_byte *= rep_rate
ingress_byte *= rep_rate
if job_type == JobType.SPARK:
egress_byte_Spark += egress_byte
ingress_byte_Spark += ingress_byte
else:
egress_byte_Presto += egress_byte
ingress_byte_Presto += ingress_byte
if not traffic_rate_disabled:
""" traffic rate """
duration = group['duration'].iloc[0]
if job_type == JobType.SPARK:
tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f")
else:
tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
tEnd = tStart + timedelta(seconds=duration)
start_minute = tStart.replace(second=0, microsecond=0)
end_minute = (tEnd + timedelta(seconds=59)).replace(second=0, microsecond=0)
# Flush expired minute buckets (older than job_start_minute)
# TODO: we can not flush as 'start_time' (str) is not the correct index
# flush_oldest_minute_buckets(minute_buckets, start_minute, os.path.join(dir_path, f"c{c}"))
min = start_minute
total_minute = (end_minute - start_minute).total_seconds() / 60
while min < end_minute:
if min not in minute_buckets:
minute_buckets[min] = {'egress_byte_Presto': 0, 'ingress_byte_Presto': 0,
'egress_byte_Spark': 0, 'ingress_byte_Spark': 0}
# minute_buckets[min] = {'egress_byte': 0, 'ingress_byte': 0}
if job_type == JobType.SPARK:
minute_buckets[min]['egress_byte_Spark'] += egress_byte / total_minute
minute_buckets[min]['ingress_byte_Spark'] += ingress_byte / total_minute
else:
minute_buckets[min]['egress_byte_Presto'] += egress_byte / total_minute
minute_buckets[min]['ingress_byte_Presto'] += ingress_byte / total_minute
# minute_buckets[min]['egress_byte'] += egress_byte / total_minute
# minute_buckets[min]['ingress_byte'] += ingress_byte / total_minute
min += timedelta(minutes=1)
new_weight_group = df.groupby(['table']).agg(
totalDataSize=('totalDataSize', 'mean')).reset_index()
new_weight_lookup = new_weight_group.set_index('table').to_dict()['totalDataSize']
weight_lookup.update(new_weight_lookup)
logging.info(f"Egress {human_readable_size(egress_byte_Presto + egress_byte_Spark)}: "
f"Presto {human_readable_size(egress_byte_Presto)}, Spark {human_readable_size(egress_byte_Spark)}")
logging.info(f"Ingress {human_readable_size(ingress_byte_Presto + ingress_byte_Spark)}: "
f"Presto {human_readable_size(ingress_byte_Presto)}, Spark {human_readable_size(ingress_byte_Spark)}")
# logging.info(f"hit rate: {scheduler.query_map.hit_rate()}")
# Log period statistics
period_logs.append({
"start_date": start_date,
"end_date": start_date + timedelta(days=period_day - 1),
"scheduling_policy": policy,
"c": c,
"cloud_compute_ratio": scheduler.get_cloud_computation_ratio(), # Store only the ratio
"egress_byte_Presto": egress_byte_Presto,
"ingress_byte_Presto": ingress_byte_Presto,
"egress_byte_Spark": egress_byte_Spark,
"ingress_byte_Spark": ingress_byte_Spark,
"dir_path": dir_path,
"opt_dir_path": None
})
if not traffic_rate_disabled:
# Flush remaining minute buckets
flush_oldest_minute_buckets(minute_buckets, None, dir_path)
# Now log all stored period statistics in a single batch
for log_entry in period_logs:
log_period_statistics(
log_entry["start_date"],
log_entry["end_date"],
log_entry["scheduling_policy"],
log_entry["c"],
log_entry["cloud_compute_ratio"], # Only store ratio instead of full scheduler object
log_entry["egress_byte_Presto"],
log_entry["ingress_byte_Presto"],
log_entry["egress_byte_Spark"],
log_entry["ingress_byte_Spark"],
# log_entry["egress_byte"],
# log_entry["ingress_byte"],
log_entry["dir_path"],
log_entry["opt_dir_path"],
traffic_rate_disabled=traffic_rate_disabled,
rep_rate=rep_rate
)
def process_jobs(c, num_of_week, dir_path, debug, policy, traffic_rate_disabled=False):
# create c30 or c50 or c70 directory under parent to store traffic_rate
if os.path.exists(os.path.join(dir_path, f"c{c}")) and not traffic_rate_disabled:
shutil.rmtree(os.path.join(dir_path, f"c{c}"), ignore_errors=False)
os.makedirs(f"{dir_path}/c{c}", exist_ok=True)
period_day = 7
setup_logger(os.path.join(dir_path, f'routing_c{c}_{policy}.txt'))
logging.info(f"Start processing jobs with c={c}, num_of_week={num_of_week}, dir_path={dir_path}, debug={debug}, policy={policy}, traffic_rate_disabled={traffic_rate_disabled}")
# prepare scheduler
period_start = datetime.strptime("2024-10-22", "%Y-%m-%d")
logging.info(f"Preparing the first df starting from {period_start}")
# Header: start_time: str,job_id,template_id,duration,
# uown_names,inputDataSize,outputDataSize,cputime, type
df_presto = pd.concat([read_Presto(period_start + timedelta(days=i)) for i in range(period_day)])
df_spark = pd.concat([read_Spark(period_start + timedelta(days=i)) for i in range(period_day)])
df = pd.concat([df_spark, df_presto])
df['totalDataSize'] = df['inputDataSize'] + df['outputDataSize']
# df = df.sort_values(['datetime', 'job_id'])
weight_group = df.groupby(['table']).agg(
totalDataSize=('totalDataSize', 'mean')).reset_index()
weight_lookup = weight_group.set_index('table').to_dict()['totalDataSize']
logging.info(f"# of jobs: {len(df['job_id'].unique())}")
period_start = period_start + timedelta(days=period_day)
""" to calculate traffic rate per minute, """
minute_buckets = OrderedDict() # OrderedDict keeps minute order for easy popping
# store logs for each period
period_logs = []
for period_offset in range(num_of_week):
start_date = period_start + timedelta(days=period_offset * period_day)
if period_offset == 0:
label = ""
else:
label = "_" + (start_date - timedelta(days=period_day)).strftime("%m%d")
# Header: start_time,job_id,template_id,duration,
# uown_names,inputDataSize,cputime, type
# TODO: this can be parallelized
df_presto = pd.concat([read_Presto(start_date + timedelta(days=i)) for i in range(period_day)])
df_spark = pd.concat([read_Spark(start_date + timedelta(days=i)) for i in range(period_day)])
df = pd.concat([df_spark, df_presto])
# df["datetime"] = df.apply(
# lambda row: datetime.strptime(row["start_time"], "%Y-%m-%d %H:%M:%S.%f")
# if row["type"] == JobType.SPARK
# else datetime.strptime(row["start_time"], "%Y-%m-%d %H:%M:%S"),
# axis=1
# )
df['totalDataSize'] = df['inputDataSize'] + df['outputDataSize']
df = df.sort_values(['start_time', 'job_id'])
print("first 5 jobs", df.head())
logging.info(f"Week {period_offset + 1}, starting on {start_date}")
logging.info(f"# of jobs: {len(df['job_id'].unique())}")
if debug:
print(f"debug mode: retain first 3K rows", flush=True)
# retain first 3K rows
jobs = df.head(3000).groupby(['start_time', 'job_id'])
print(jobs.head(1))
else:
jobs = df.groupby(['start_time', 'job_id'])
# prepare scheduler with optimization results
scheduler = Scheduler(dir_path=os.path.join(dir_path, f"test_run_c{c}_bw0.02_local{100-c}{label}"),
table_size_path='report-table-size-0907.csv' if start_date < datetime.strptime("2024-05-13", "%Y-%m-%d") else 'report-table-size-20241021.csv',
weight_lookup=weight_lookup) # TODO: stateful between periods
#logging.info(f"hit rate: {scheduler.query_map.hit_rate()}")
egress_byte_Presto = 0
ingress_byte_Presto = 0
egress_byte_Spark = 0
ingress_byte_Spark = 0
hybrid_job_count = 0
hybrid_job_bytes = 0
# enumerate jobs
for (start_time, job_id), group in jobs:
job_type = group['type'].iloc[0]
if job_type == JobType.SPARK:
cputime = group['cputime'].iloc[0]
else:
cputime = group['cputime'].sum()
template_id = group['template_id'].iloc[0]
table_volume_list = [(row['table'], row['inputDataSize'], row['outputDataSize']) for _, row in group.iterrows()]
placement_y, egress_byte, ingress_byte = scheduler.place_query(template_id, cputime, table_volume_list,
policy=policy,
target_cloud_cpu_ratio=c / 100,
info=start_time)
if job_type == JobType.SPARK:
egress_byte_Spark += egress_byte
ingress_byte_Spark += ingress_byte
else:
egress_byte_Presto += egress_byte
ingress_byte_Presto += ingress_byte
if egress_byte + ingress_byte > 0:
hybrid_job_count += 1
hybrid_job_bytes += sum([input + output for _, input, output in table_volume_list])
if not traffic_rate_disabled:
""" traffic rate """
duration = group['duration'].iloc[0]
if job_type == JobType.SPARK:
tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f")
else:
tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
tEnd = tStart + timedelta(seconds=duration)
start_minute = tStart.replace(second=0, microsecond=0)
end_minute = (tEnd + timedelta(seconds=59)).replace(second=0, microsecond=0)
# Flush expired minute buckets (older than job_start_minute)
# TODO: we can not flush as 'start_time' (str) is not the correct index
# flush_oldest_minute_buckets(minute_buckets, start_minute, os.path.join(dir_path, f"c{c}"))
min = start_minute
total_minute = (end_minute - start_minute).total_seconds() / 60
while min < end_minute:
if min not in minute_buckets:
minute_buckets[min] = {'egress_byte_Presto': 0, 'ingress_byte_Presto': 0,
'egress_byte_Spark': 0, 'ingress_byte_Spark': 0}
# minute_buckets[min] = {'egress_byte': 0, 'ingress_byte': 0}
if job_type == JobType.SPARK:
minute_buckets[min]['egress_byte_Spark'] += egress_byte / total_minute
minute_buckets[min]['ingress_byte_Spark'] += ingress_byte / total_minute
else:
minute_buckets[min]['egress_byte_Presto'] += egress_byte / total_minute
minute_buckets[min]['ingress_byte_Presto'] += ingress_byte / total_minute
# minute_buckets[min]['egress_byte'] += egress_byte / total_minute
# minute_buckets[min]['ingress_byte'] += ingress_byte / total_minute
min += timedelta(minutes=1)
new_weight_group = df.groupby(['table']).agg(
totalDataSize=('totalDataSize', 'mean')).reset_index()
new_weight_lookup = new_weight_group.set_index('table').to_dict()['totalDataSize']
weight_lookup.update(new_weight_lookup)
logging.info(f"Egress {human_readable_size(egress_byte_Presto + egress_byte_Spark)}: "
f"Presto {human_readable_size(egress_byte_Presto)}, Spark {human_readable_size(egress_byte_Spark)}")
logging.info(f"Ingress {human_readable_size(ingress_byte_Presto + ingress_byte_Spark)}: "
f"Presto {human_readable_size(ingress_byte_Presto)}, Spark {human_readable_size(ingress_byte_Spark)}")
logging.info(f"# of hybrid jobs: {hybrid_job_count} with access bytes: {human_readable_size(hybrid_job_bytes)}")
#logging.info(f"hit rate: {scheduler.query_map.hit_rate()}")
# Log period statistics
period_logs.append({
"start_date": start_date,
"end_date": start_date + timedelta(days=period_day-1),
"scheduling_policy": policy,
"c": c,
"cloud_compute_ratio": scheduler.get_cloud_computation_ratio(), # Store only the ratio
"egress_byte_Presto": egress_byte_Presto,
"ingress_byte_Presto": ingress_byte_Presto,
"egress_byte_Spark": egress_byte_Spark,
"ingress_byte_Spark": ingress_byte_Spark,
"dir_path": dir_path,
"opt_dir_path": os.path.join(dir_path, f"test_run_c{c}_bw0.02_local{100-c}{label}")
})
if not traffic_rate_disabled:
# Flush remaining minute buckets
flush_oldest_minute_buckets(minute_buckets, None, os.path.join(dir_path, f"c{c}"))
# Now log all stored period statistics in a single batch
for log_entry in period_logs:
log_period_statistics(
log_entry["start_date"],
log_entry["end_date"],
log_entry["scheduling_policy"],
log_entry["c"],
log_entry["cloud_compute_ratio"], # Only store ratio instead of full scheduler object
log_entry["egress_byte_Presto"],
log_entry["ingress_byte_Presto"],
log_entry["egress_byte_Spark"],
log_entry["ingress_byte_Spark"],
# log_entry["egress_byte"],
# log_entry["ingress_byte"],
log_entry["dir_path"],
log_entry["opt_dir_path"],
traffic_rate_disabled=traffic_rate_disabled
)
def Moirai_weekly_cost_print(opt_path):
dir_path = opt_path
if not os.path.exists(dir_path) or not os.path.exists(os.path.join(dir_path, "log.csv")):
print("No log.csv found")
return
# hesder: period, mode, cloud_computation_ratio, cloud_computation_target,
# ingress_byte_Presto,egress_byte_Presto,ingress_byte_Spark,egress_byte_Spark,
# P90_traffic_bps, P95_traffic_bps, P99_traffic_bps,
# movement_ingress_bytes, movement_egress_bytes, rep_bytes, sample_rate
df = pd.read_csv(os.path.join(dir_path, "log.csv"))
df['job_ingress_bytes'] = df['ingress_byte_Presto'] + df['ingress_byte_Spark']
df['job_egress_bytes'] = df['egress_byte_Presto'] + df['egress_byte_Spark']
df['traffic_volume'] = (df['job_ingress_bytes'] + df['job_egress_bytes']
+ df['movement_ingress_bytes'] + df['movement_egress_bytes'])
df['egress_volume'] = df['job_egress_bytes'] + df['movement_egress_bytes']
for c in df['cloud_computation_target'].unique():
df_c = df[df['cloud_computation_target'] == c]
print(f"Cloud computation target: {c}")
print(f"Weekly traffic volume: {human_readable_size(df_c['traffic_volume'].mean())}")
print("Bandwidth:", df_c['P95_traffic_bps'].max())
network_cost = df['P95_traffic_bps'].max() / (100*1024**3) * 24 * 7
egress_cost = df_c['egress_volume'].mean() / 1024**3 * 0.02
rep_cost = df_c['rep_bytes'].mean() / 1024**3 * 0.023 / 4
print(f"Network cost: {network_cost:.0f}, Egress cost: {egress_cost:.0f}, Replication cost: {rep_cost:.0f}")
def extract_movement_rep_and_sample(log_file):
"""Extracts data movement ingress and egress bytes from log.txt"""
movement_ingress_bytes = movement_egress_bytes = 0
replication_size = None
sample_rate = None
if os.path.exists(log_file):
with open(log_file, "r") as f:
for line in f:
# Extract data movement bytes
movement_match = re.search(r"Data movement:\s([\d.]+\s*[A-Z]*B) ingress,\s([\d.]+\s*[A-Z]*B) .* egress",
line)
if movement_match:
movement_ingress_bytes = parse_size(movement_match.group(1))
movement_egress_bytes = parse_size(movement_match.group(2))
# Extract replication size (overlap)
replication_match = re.search(r"Storage: .*?(\d+\.\d+\s*[A-Z]B)\s+overlap", line)
if replication_match:
replication_size = parse_size(replication_match.group(1))
# Extract sample rate (k=0.XXX) or k=1.000
sample_match = re.search(r"k=(1\.000)", line)
if sample_match:
sample_rate = 1.0
sample_match = re.search(r"k=(0\.\d+)", line)
if sample_match:
sample_rate = float(sample_match.group(1)) # Convert to float
return movement_ingress_bytes, movement_egress_bytes, replication_size, sample_rate
def calculate_traffic_percentiles(traffic_dir: str, start_date: datetime, end_date: datetime, debug: bool = False):
"""Reads traffic data from CSV files and computes percentiles"""
all_traffic_rates = []
for single_date in pd.date_range(start_date, end_date):
traffic_file = os.path.join(traffic_dir, f"traffic_{single_date.strftime('%Y%m%d')}.csv")
if os.path.exists(traffic_file):
df = pd.read_csv(traffic_file)
df['egress_rate_bps'] = df['egress_rate_presto_bps'] + df['egress_rate_spark_bps']
df['ingress_rate_bps'] = df['ingress_rate_presto_bps'] + df['ingress_rate_spark_bps']
df['traffic_rate_bps'] = df['egress_rate_bps'] + df['ingress_rate_bps']
if debug and len(df) != 1440:
print(f"Check {traffic_file}: {len(df)}")
all_traffic_rates.extend(df["traffic_rate_bps"].tolist())
else:
print(f"Traffic file not found: {traffic_file}")
if not all_traffic_rates:
return None, None, None # No data found
return (
int(np.percentile(all_traffic_rates, 90)),
int(np.percentile(all_traffic_rates, 95)),
int(np.percentile(all_traffic_rates, 99)),
)
def log_period_statistics(start_date: datetime, end_date: datetime, scheduling_policy: str, c: int,
cloud_compute_rate: float,
egress_byte_Presto, ingress_byte_Presto,
egress_byte_Spark, ingress_byte_Spark,
dir_path, opt_dir_path, traffic_rate_disabled=False,
rep_rate=None
):
"""Logs summarized statistics for each period"""
period_str = f"{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')}"
cloud_computation_rate_str = f"{cloud_compute_rate*100:.2f}%" # Get actual ratio from scheduler
cloud_computation_target = c
# Extract movement bytes
if opt_dir_path is None:
movement_ingress = movement_egress = 0
rep_size = rep_rate * parse_size("299.12PB")
sample_rate = 1.0
else:
movement_ingress, movement_egress, rep_size, sample_rate = extract_movement_rep_and_sample(os.path.join(opt_dir_path, "log.txt"))
if not traffic_rate_disabled:
# Compute traffic percentiles
if rep_rate is not None:
print("Evaluating baselines")
traffic_dir = dir_path
else:
traffic_dir = os.path.join(dir_path, f"c{c}")
p90, p95, p99 = calculate_traffic_percentiles(traffic_dir, start_date, end_date)
else:
p90 = p95 = p99 = None
# Format log line
log_entry = (f"{period_str},{scheduling_policy},{cloud_computation_rate_str},{cloud_computation_target},"
f"{ingress_byte_Presto},{egress_byte_Presto},"
f"{ingress_byte_Spark},{egress_byte_Spark},"
f"{p90},{p95},{p99},{movement_ingress},{movement_egress},"
f"{rep_size},{sample_rate}\n")
log_file = os.path.join(dir_path, f"log.csv")
write_header = not os.path.exists(log_file) # Write header if file does not exist
try:
with open(log_file, "a") as f:
fcntl.flock(f, fcntl.LOCK_EX) # Lock file to prevent interference
if write_header:
f.write(
"period,mode,cloud_computation_ratio,cloud_computation_target,"
"ingress_byte_Presto,egress_byte_Presto,"
"ingress_byte_Spark,egress_byte_Spark,"
"P90_traffic_bps,P95_traffic_bps,P99_traffic_bps,"
"movement_ingress_bytes,movement_egress_bytes,rep_bytes,sample_rate\n")
f.write(log_entry)
fcntl.flock(f, fcntl.LOCK_UN) # Unlock file after writing
logging.info(f"Logged period statistics to log.csv: {log_entry.strip()}")
except Exception as e:
logging.error(f"Error writing to log.csv: {e}")
def read_Spark(date: datetime):
print(f"Reading Spark jobs for {date}")
df = pd.read_csv(f"jobTraces/{date.strftime('%Y%m%d')}-Spark.csv", dtype={
'job_id': str, 'start_time': str, 'duration': float,
'cputime': float, 'db_name': str, 'table_name': str,
#'uown_names': str,
'inputDataSize': float,
'outputDataSize': float, 'template_id': str
}, na_values=['\\N'])
df['db_name'] = df['db_name'].astype(str)
df['table_name'] = df['table_name'].astype(str)
df['template_id'] = df['template_id'].astype(str)
df['table'] = df['db_name'] + '.' + df['table_name']
df['type'] = JobType.SPARK
df = df[['job_id', 'start_time', 'duration', 'cputime', 'table', 'inputDataSize',
'outputDataSize', 'template_id', 'type', 'db_name', 'table_name', 'uown_names']]
return df
def read_Presto(date: datetime):
print(f"Reading Presto jobs for {date}")
df = pd.read_csv(f"jobTraces/{date.strftime('%Y%m%d')}-Presto.csv", dtype={
'job_id': str, 'start_time': str, 'duration': float,
'cputime': float, 'db_name': str, 'table_name': str,
#'uown_names': str,
'inputDataSize': float,
'outputDataSize': float, 'template_id': str
}, na_values=['\\N'])
df['type'] = JobType.PRESTO
df['table'] = df['db_name'] + '.' + df['table_name']
df = df[['job_id', 'start_time', 'duration', 'cputime', 'table', 'inputDataSize',
'outputDataSize', 'template_id', 'type', 'db_name', 'table_name', 'uown_names']]
return df
def setup_logger(log_path):
# Get the root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Remove any existing handlers
if logger.hasHandlers():
logger.handlers.clear()
# Create a file handler for logging
file_handler = logging.FileHandler(log_path, mode='a')
file_handler.setLevel(logging.INFO)
# Create a logging format
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
# Add the file handler to the logger
logger.addHandler(file_handler)
def flush_oldest_minute_buckets(minute_buckets, cutoff_minute, dir_path):
"""Writes traffic data to a CSV file and removes old minute data."""
while minute_buckets and (cutoff_minute is None or next(iter(minute_buckets)) < cutoff_minute):
minute, traffic = minute_buckets.popitem(last=False) # Remove oldest entry
day = minute.date().strftime('%Y%m%d')
file_path = f"traffic_{day}.csv"
df_minute = pd.DataFrame([{
'minute': minute.strftime('%H:%M'),
'egress_rate_presto_bps': int((traffic['egress_byte_Presto'] * 8) / 60), # Convert bytes to bits
'ingress_rate_presto_bps': int((traffic['ingress_byte_Presto'] * 8) / 60),
'egress_rate_spark_bps': int((traffic['egress_byte_Spark'] * 8) / 60), # Convert bytes to bits
'ingress_rate_spark_bps': int((traffic['ingress_byte_Spark'] * 8) / 60),
# 'egress_rate_bps': int((traffic['egress_byte'] * 8) / 60), # Convert bytes to bits
# 'ingress_rate_bps': int((traffic['ingress_byte'] * 8) / 60)
}])
# TODO: batch writes to reduce I/O
df_minute.to_csv(os.path.join(dir_path, file_path),
mode='a', index=False, header=not os.path.exists(os.path.join(dir_path, file_path)))
# Used for bug fixing, ignore it
def iterate_logs(opt_path, num_of_week, yugong):
with open(f"{opt_path}/fix.csv", "w") as f:
f.write("period,cloud_computation_target,P90_traffic_bps,P95_traffic_bps,P99_traffic_bps,"
"movement_ingress_bytes,movement_egress_bytes,rep_bytes,sample_rate\n")
for c in [30, 50, 70]:
period_day = 7
period_start = datetime.strptime("2024-10-29", "%Y-%m-%d")
for period_offset in range(num_of_week):
start_date = period_start + timedelta(days=period_offset * period_day)
p90, p95, p99 = calculate_traffic_percentiles(os.path.join(opt_path, f"c{c}"), start_date, start_date + timedelta(days=period_day-1), debug=True)
if period_offset == 0:
label = ""
else:
label = "_" + (start_date - timedelta(days=period_day)).strftime("%m%d")
if yugong:
opt_dir_path = os.path.join(opt_path, f"test_run_c{c}_bw0.20_local{100-c}{label}")
else:
opt_dir_path = os.path.join(opt_path, f"test_run_c{c}_bw0.02_local{100-c}{label}")
log_file = os.path.join(opt_dir_path, "log.txt")
if os.path.exists(log_file):
movement_ingress, movement_egress, rep_size, sample_rate = extract_movement_rep_and_sample(log_file)
f.write(f"{start_date.strftime('%Y%m%d')}-{(start_date + timedelta(days=period_day-1)).strftime('%Y%m%d')},"
f"{c},{p90},{p95},{p99},{movement_ingress},{movement_egress},{rep_size},{sample_rate}\n")
def process_yugong(c, num_of_week, dir_path, debug):
# create c30 or c50 or c70 directory under parent to store traffic_rate
if os.path.exists(os.path.join(dir_path, f"c{c}")):
shutil.rmtree(os.path.join(dir_path, f"c{c}"), ignore_errors=False)
os.makedirs(f"{dir_path}/c{c}", exist_ok=True)
period_day = 7
setup_logger(os.path.join(dir_path, f'routing_c{c}.txt'))
# prepare scheduler
period_start = datetime.strptime("2024-10-29", "%Y-%m-%d")
""" to calculate traffic rate per minute, """
minute_buckets = OrderedDict() # OrderedDict keeps minute order for easy popping
# store logs for each period
period_logs = []
for period_offset in range(num_of_week):
start_date = period_start + timedelta(days=period_offset * period_day)
if period_offset == 0:
label = ""
else:
label = "_" + (start_date - timedelta(days=period_day)).strftime("%m%d")
# Header: start_time,job_id,template_id,duration,
# uown_names,inputDataSize,cputime, type
# TODO: this can be parallelized
df_presto = pd.concat([read_Presto(start_date + timedelta(days=i)) for i in range(period_day)])
df_spark = pd.concat([read_Spark(start_date + timedelta(days=i)) for i in range(period_day)])
df = pd.concat([df_spark, df_presto])
df['totalDataSize'] = df['inputDataSize'] + df['outputDataSize']
df = df.sort_values(['start_time', 'job_id'])
print("first 5 jobs", df.head())
logging.info(f"Week {period_offset + 1}, starting on {start_date}")
logging.info(f"# of jobs: {len(df['job_id'].unique())}")
if debug:
print(f"debug mode: retain first 3K rows", flush=True)
# retain first 3K rows
jobs = df.head(3000).groupby(['start_time', 'job_id'])
print(jobs.head(1))
else:
jobs = df.groupby(['start_time', 'job_id'])
""" prepare ownership info for query and table """
ownership = Ownership()
#print(f"# of unique query ownership after processing: {df['uown_names'].nunique()}", flush=True)
table_df = pd.read_csv("report-table-size-20241021.csv",
dtype={'hive_database_name': str, 'hive_table_name': str, 'uown_names': str},
na_values=['\\N'])
table_df['table'] = table_df['hive_database_name'] + '.' + table_df['hive_table_name']
for table, uown_names in zip(table_df['table'], table_df['uown_names']):
if pd.isna(uown_names): # Check for NaN values
continue
# print(f"Table {table} has ownership {uown_names}", flush=True)
ownership.add_table_ownership(table, uown_names)
# prepare scheduler with optimization results
scheduler = Scheduler(dir_path=os.path.join(dir_path, f"test_run_c{c}_bw0.20_local{100-c}{label}"),
table_size_path='report-table-size-0907.csv' if start_date < datetime.strptime("2024-05-13", "%Y-%m-%d") else 'report-table-size-20241021.csv',
yugong=True, weight_lookup=None, ownership=ownership)
egress_byte_Presto = 0
ingress_byte_Presto = 0
egress_byte_Spark = 0
ingress_byte_Spark = 0
# enumerate jobs
for (start_time, job_id), group in jobs:
job_type = group['type'].iloc[0]
if job_type == JobType.SPARK:
cputime = group['cputime'].iloc[0]
else:
cputime = group['cputime'].sum()
template_id = group['uown_names'].iloc[0]
table_volume_list = [(row['table'], row['inputDataSize'], row['outputDataSize']) for _, row in group.iterrows()]
placement_y, egress_byte, ingress_byte = scheduler.place_query(template_id, cputime, table_volume_list,
policy='size-predict',
target_cloud_cpu_ratio=c / 100,
info=start_time)
if job_type == JobType.SPARK:
egress_byte_Spark += egress_byte
ingress_byte_Spark += ingress_byte
else:
egress_byte_Presto += egress_byte
ingress_byte_Presto += ingress_byte
""" traffic rate """
duration = group['duration'].iloc[0]
if job_type == JobType.SPARK:
tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f")
else:
tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
tEnd = tStart + timedelta(seconds=duration)
start_minute = tStart.replace(second=0, microsecond=0)
end_minute = (tEnd + timedelta(seconds=59)).replace(second=0, microsecond=0)
# Flush expired minute buckets (older than job_start_minute)
#flush_oldest_minute_buckets(minute_buckets, start_minute, os.path.join(dir_path, f"c{c}"))
min = start_minute
total_minute = (end_minute - start_minute).total_seconds() / 60
while min < end_minute:
if min not in minute_buckets:
minute_buckets[min] = {'egress_byte_Presto': 0, 'ingress_byte_Presto': 0,
'egress_byte_Spark': 0, 'ingress_byte_Spark': 0}
if job_type == JobType.SPARK:
minute_buckets[min]['egress_byte_Spark'] += egress_byte / total_minute
minute_buckets[min]['ingress_byte_Spark'] += ingress_byte / total_minute
else:
minute_buckets[min]['egress_byte_Presto'] += egress_byte / total_minute
minute_buckets[min]['ingress_byte_Presto'] += ingress_byte / total_minute
min += timedelta(minutes=1)
logging.info(f"Egress {human_readable_size(egress_byte_Presto + egress_byte_Spark)}: "
f"Presto {human_readable_size(egress_byte_Presto)}, Spark {human_readable_size(egress_byte_Spark)}")
logging.info(f"Ingress {human_readable_size(ingress_byte_Presto + ingress_byte_Spark)}: "
f"Presto {human_readable_size(ingress_byte_Presto)}, Spark {human_readable_size(ingress_byte_Spark)}")
# Log period statistics
period_logs.append({
"start_date": start_date,
"end_date": start_date + timedelta(days=period_day-1),
"scheduling_policy": "size-predict",
"c": c,
"cloud_compute_ratio": scheduler.get_cloud_computation_ratio(), # Store only the ratio
"egress_byte_Presto": egress_byte_Presto,
"ingress_byte_Presto": ingress_byte_Presto,
"egress_byte_Spark": egress_byte_Spark,
"ingress_byte_Spark": ingress_byte_Spark,
"dir_path": dir_path,
"opt_dir_path": os.path.join(dir_path, f"test_run_c{c}_bw0.20_local{100-c}{label}")
})
# Flush remaining minute buckets
flush_oldest_minute_buckets(minute_buckets, None, os.path.join(dir_path, f"c{c}"))
# Now log all stored period statistics in a single batch
for log_entry in period_logs:
log_period_statistics(
log_entry["start_date"],
log_entry["end_date"],
log_entry["scheduling_policy"],
log_entry["c"],
log_entry["cloud_compute_ratio"], # Only store ratio instead of full scheduler object
log_entry["egress_byte_Presto"],
log_entry["ingress_byte_Presto"],
log_entry["egress_byte_Spark"],
log_entry["ingress_byte_Spark"],
log_entry["dir_path"],
log_entry["opt_dir_path"]
)
if __name__ == "__main__":
# process_baseline("rep_x_month", "baselines/rep_x_month_c30_rep0.210_20250301_170015",
# 1, 30, 0.21)
if args.yugong:
process_yugong(args.c, args.num_week, args.opt_path, args.debug)
else:
process_jobs(args.c, args.num_week, args.opt_path, args.debug, args.policy,
traffic_rate_disabled=args.simple)
# iterate_logs(args.opt_path, args.num_week, args.yugong)
# dollar costs (will be included in eval)
# Moirai_weekly_cost_print(args.opt_path)