in scheduler.py [0:0]
def process_yugong(c, num_of_week, dir_path, debug):
# create c30 or c50 or c70 directory under parent to store traffic_rate
if os.path.exists(os.path.join(dir_path, f"c{c}")):
shutil.rmtree(os.path.join(dir_path, f"c{c}"), ignore_errors=False)
os.makedirs(f"{dir_path}/c{c}", exist_ok=True)
period_day = 7
setup_logger(os.path.join(dir_path, f'routing_c{c}.txt'))
# prepare scheduler
period_start = datetime.strptime("2024-10-29", "%Y-%m-%d")
""" to calculate traffic rate per minute, """
minute_buckets = OrderedDict() # OrderedDict keeps minute order for easy popping
# store logs for each period
period_logs = []
for period_offset in range(num_of_week):
start_date = period_start + timedelta(days=period_offset * period_day)
if period_offset == 0:
label = ""
else:
label = "_" + (start_date - timedelta(days=period_day)).strftime("%m%d")
# Header: start_time,job_id,template_id,duration,
# uown_names,inputDataSize,cputime, type
# TODO: this can be parallelized
df_presto = pd.concat([read_Presto(start_date + timedelta(days=i)) for i in range(period_day)])
df_spark = pd.concat([read_Spark(start_date + timedelta(days=i)) for i in range(period_day)])
df = pd.concat([df_spark, df_presto])
df['totalDataSize'] = df['inputDataSize'] + df['outputDataSize']
df = df.sort_values(['start_time', 'job_id'])
print("first 5 jobs", df.head())
logging.info(f"Week {period_offset + 1}, starting on {start_date}")
logging.info(f"# of jobs: {len(df['job_id'].unique())}")
if debug:
print(f"debug mode: retain first 3K rows", flush=True)
# retain first 3K rows
jobs = df.head(3000).groupby(['start_time', 'job_id'])
print(jobs.head(1))
else:
jobs = df.groupby(['start_time', 'job_id'])
""" prepare ownership info for query and table """
ownership = Ownership()
#print(f"# of unique query ownership after processing: {df['uown_names'].nunique()}", flush=True)
table_df = pd.read_csv("report-table-size-20241021.csv",
dtype={'hive_database_name': str, 'hive_table_name': str, 'uown_names': str},
na_values=['\\N'])
table_df['table'] = table_df['hive_database_name'] + '.' + table_df['hive_table_name']
for table, uown_names in zip(table_df['table'], table_df['uown_names']):
if pd.isna(uown_names): # Check for NaN values
continue
# print(f"Table {table} has ownership {uown_names}", flush=True)
ownership.add_table_ownership(table, uown_names)
# prepare scheduler with optimization results
scheduler = Scheduler(dir_path=os.path.join(dir_path, f"test_run_c{c}_bw0.20_local{100-c}{label}"),
table_size_path='report-table-size-0907.csv' if start_date < datetime.strptime("2024-05-13", "%Y-%m-%d") else 'report-table-size-20241021.csv',
yugong=True, weight_lookup=None, ownership=ownership)
egress_byte_Presto = 0
ingress_byte_Presto = 0
egress_byte_Spark = 0
ingress_byte_Spark = 0
# enumerate jobs
for (start_time, job_id), group in jobs:
job_type = group['type'].iloc[0]
if job_type == JobType.SPARK:
cputime = group['cputime'].iloc[0]
else:
cputime = group['cputime'].sum()
template_id = group['uown_names'].iloc[0]
table_volume_list = [(row['table'], row['inputDataSize'], row['outputDataSize']) for _, row in group.iterrows()]
placement_y, egress_byte, ingress_byte = scheduler.place_query(template_id, cputime, table_volume_list,
policy='size-predict',
target_cloud_cpu_ratio=c / 100,
info=start_time)
if job_type == JobType.SPARK:
egress_byte_Spark += egress_byte
ingress_byte_Spark += ingress_byte
else:
egress_byte_Presto += egress_byte
ingress_byte_Presto += ingress_byte
""" traffic rate """
duration = group['duration'].iloc[0]
if job_type == JobType.SPARK:
tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f")
else:
tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
tEnd = tStart + timedelta(seconds=duration)
start_minute = tStart.replace(second=0, microsecond=0)
end_minute = (tEnd + timedelta(seconds=59)).replace(second=0, microsecond=0)
# Flush expired minute buckets (older than job_start_minute)
#flush_oldest_minute_buckets(minute_buckets, start_minute, os.path.join(dir_path, f"c{c}"))
min = start_minute
total_minute = (end_minute - start_minute).total_seconds() / 60
while min < end_minute:
if min not in minute_buckets:
minute_buckets[min] = {'egress_byte_Presto': 0, 'ingress_byte_Presto': 0,
'egress_byte_Spark': 0, 'ingress_byte_Spark': 0}
if job_type == JobType.SPARK:
minute_buckets[min]['egress_byte_Spark'] += egress_byte / total_minute
minute_buckets[min]['ingress_byte_Spark'] += ingress_byte / total_minute
else:
minute_buckets[min]['egress_byte_Presto'] += egress_byte / total_minute
minute_buckets[min]['ingress_byte_Presto'] += ingress_byte / total_minute
min += timedelta(minutes=1)
logging.info(f"Egress {human_readable_size(egress_byte_Presto + egress_byte_Spark)}: "
f"Presto {human_readable_size(egress_byte_Presto)}, Spark {human_readable_size(egress_byte_Spark)}")
logging.info(f"Ingress {human_readable_size(ingress_byte_Presto + ingress_byte_Spark)}: "
f"Presto {human_readable_size(ingress_byte_Presto)}, Spark {human_readable_size(ingress_byte_Spark)}")
# Log period statistics
period_logs.append({
"start_date": start_date,
"end_date": start_date + timedelta(days=period_day-1),
"scheduling_policy": "size-predict",
"c": c,
"cloud_compute_ratio": scheduler.get_cloud_computation_ratio(), # Store only the ratio
"egress_byte_Presto": egress_byte_Presto,
"ingress_byte_Presto": ingress_byte_Presto,
"egress_byte_Spark": egress_byte_Spark,
"ingress_byte_Spark": ingress_byte_Spark,
"dir_path": dir_path,
"opt_dir_path": os.path.join(dir_path, f"test_run_c{c}_bw0.20_local{100-c}{label}")
})
# Flush remaining minute buckets
flush_oldest_minute_buckets(minute_buckets, None, os.path.join(dir_path, f"c{c}"))
# Now log all stored period statistics in a single batch
for log_entry in period_logs:
log_period_statistics(
log_entry["start_date"],
log_entry["end_date"],
log_entry["scheduling_policy"],
log_entry["c"],
log_entry["cloud_compute_ratio"], # Only store ratio instead of full scheduler object
log_entry["egress_byte_Presto"],
log_entry["ingress_byte_Presto"],
log_entry["egress_byte_Spark"],
log_entry["ingress_byte_Spark"],
log_entry["dir_path"],
log_entry["opt_dir_path"]
)