baselines.py (250 lines of code) (raw):
import argparse
import os
from datetime import datetime
from enum import Enum
from random import random
import numpy as np
import pandas as pd
from utility import human_readable_size
"""
Eventual goal is to support three baselines:
1. No Rep or Rep recent X months of data, parameter: X
* Tables are randomly split into two sites, but consider on-prem to have storage space reserved for X months of data
* For example, if replicated data is 21% of total data and split is 50% on-prem, 50% cloud, then
** on-prem has 29% of unique data, cloud has 50% of unique data, and 21% of data are replicated
** and when generating placement, we allocate 50%/121% of data to on-prem and 71%/121% of data to cloud
2. rep top access density tables, parameter: replication budget
* Rank the tables by access density (read+write access size/table size), and replicate the top tables until the replication budget is exhausted
* Other tables are randomly split into two sites
* For example, if replicated data is 5% of total data and split is 50% on-prem, 50% cloud, then
** on-prem has 45% of unique data, cloud has 50% of unique data, and 5% of data are replicated
** and when generating placement, we allocate 45%/95% of data to on-prem and 50%/95% of data to cloud
3. optimized data placement based on Moirai job distribution
* Data should be placed to the site where most jobs depending on them are running
"""
# Define command-line arguments
parser = argparse.ArgumentParser(description="Data placement for baselines")
parser.add_argument("--baseline", type=str, help="baseline to run",
choices=["rep_rtd", "volley", "rep_x_month",])
# parser.add_argument("--rep_strategy", type=str, help="scheduling policy",
# choices=["rep_x_month", "rep_rtd"])
# parser.add_argument("--placement_strategy", type=str, help="placement strategy",
# choices=["random", "volley"])
parser.add_argument("--rep_rate", type=float, help="Pre-selecting replication budget rate, [0, 1]")
parser.add_argument("--c", type=int, default=30, help="Portion of compute to cloud")
args = parser.parse_args()
class Status(Enum):
ONPREM = 0
CLOUD = 1
REP = 2
class baselines:
def __init__(self, tag: str, cloud_target: int, rep_rate: float):
self.parent_dir = "baselines"
os.makedirs(self.parent_dir, exist_ok=True)
self.tag = tag
my_dir_name = f"{tag}_c{cloud_target}_rep{rep_rate:.3f}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.my_dir = os.path.join(self.parent_dir, my_dir_name)
os.makedirs(self.my_dir, exist_ok=True)
self.cloud_target = cloud_target
assert 0 <= cloud_target <= 100, "cloud_target must be in [0, 100]"
self.rep_rate = rep_rate
self.table_size_lookup = self._load_table_size()
self.total_data_size = sum(self.table_size_lookup.values())
self.on_prem_data_size = 0
self.on_prem_capacity = self.total_data_size * (100 - cloud_target) / 100
# header: abstractFingerPrint,db_name,table_name,inputDataSize,outputDataSize
self.workload = self._load_workload()
self.placement = {} # db_table name -> Status
if tag == "rep_x_month":
assert rep_rate in [0.21] # 3M
self.on_prem_data_size = self.total_data_size * rep_rate
assert self.on_prem_data_size <= self.on_prem_capacity, "Not enough capacity for replication"
self.rep_x_month_placement()
elif tag == "rep_rtd":
self.preselect_replication()
self.data_placement_random()
elif tag == "MoiJob":
self.preselect_replication()
df = self.get_moirai_job_distribution()
self.data_placement_by_compute_distribution(df)
#self.volley_placement()
elif tag == "volley_new":
self.preselect_replication()
df = self.get_random_project_distribution()
self.data_placement_by_compute_distribution(df)
else:
raise ValueError(f"Unknown baseline: {tag}")
self.persist_placement()
def _load_table_size(self):
df = pd.read_csv("report-table-size-20241021.csv")
required_columns = {"hive_database_name", "hive_table_name", "dir_size"}
assert required_columns.issubset(df.columns), f"Missing columns: {required_columns - set(df.columns)}"
df['hive_database_name'] = df['hive_database_name'].astype(str)
df['hive_table_name'] = df['hive_table_name'].astype(str)
df['dir_size'] = df['dir_size'].astype(int)
df = df[df['dir_size'] > 0]
table_size_lookup = {}
# random shuffle to avoid bias
df = df.sample(frac=1)
for row in df.itertuples():
lookup_key = row.hive_database_name + "." + row.hive_table_name
value = row.dir_size
table_size_lookup[lookup_key] = value
print("total data size", human_readable_size(df['dir_size'].sum()))
return table_size_lookup
def _load_workload(self):
df_presto = pd.read_csv("newTraces/report-abFP-volume-table-20241022-20241028-Presto.csv")
df_spark = pd.read_csv("newTraces/report-abFP-volume-table-20241022-20241028-Spark.csv")
job_data_access_df = pd.concat([df_presto, df_spark], ignore_index=True)
job_data_access_df.drop(columns=["cputime"], inplace=True)
return job_data_access_df
def preselect_replication(self):
df = self.workload.copy()
df['access_size'] = df['inputDataSize'] + df['outputDataSize']
df['db_table'] = df['db_name'] + "." + df['table_name']
df = df.groupby('db_table', as_index=False)['access_size'].sum()
df = df[df['db_table'].isin(self.table_size_lookup)]
df['access_density'] = df['access_size'] / df['db_table'].map(self.table_size_lookup)
df = df.sort_values(by='access_density', ascending=False)
total_rep_size = 0
for _, row in df.iterrows():
table = row['db_table']
table_size = self.table_size_lookup.get(table, 0)
if total_rep_size >= self.total_data_size * self.rep_rate:
break
elif total_rep_size + table_size <= self.total_data_size * self.rep_rate + 1024**3: # 1GB buffer
self.placement[table] = Status.REP
total_rep_size += table_size
print(f"replicated data size: {human_readable_size(total_rep_size)}")
self.on_prem_data_size += total_rep_size
def rep_x_month_placement(self):
on_prem_capacity_left = self.on_prem_capacity - self.on_prem_data_size
cloud_capacity_left = self.total_data_size - self.on_prem_capacity - self.on_prem_data_size
print(f"on-prem capacity left: {human_readable_size(on_prem_capacity_left)}, "
f"cloud capacity left: {human_readable_size(cloud_capacity_left)}")
prob_on_prem = on_prem_capacity_left / (on_prem_capacity_left + cloud_capacity_left)
placed_tables = self.placement.keys()
for table, size in self.table_size_lookup.items():
effective_size = size * (1-self.rep_rate)
if table in placed_tables:
continue
if on_prem_capacity_left > effective_size and cloud_capacity_left > effective_size:
if random() < prob_on_prem:
self.placement[table] = Status.ONPREM
on_prem_capacity_left -= effective_size
else:
self.placement[table] = Status.CLOUD
cloud_capacity_left -= effective_size
elif on_prem_capacity_left > effective_size:
self.placement[table] = Status.ONPREM
on_prem_capacity_left -= effective_size
elif cloud_capacity_left > effective_size:
self.placement[table] = Status.CLOUD
cloud_capacity_left -= effective_size
else:
# print(f"Table {table} ({human_readable_size(size)}) is larger than remaining capacity "
# f"on-prem ({human_readable_size(on_prem_capacity_left)}) "
# f"and cloud ({human_readable_size(cloud_capacity_left)})")
self.placement[table] = Status.CLOUD
cloud_capacity_left -= effective_size
self.on_prem_data_size = self.on_prem_capacity - on_prem_capacity_left
print(f"on-prem data size: {human_readable_size(self.on_prem_data_size)}")
def data_placement_random(self):
on_prem_capacity_left = self.on_prem_capacity - self.on_prem_data_size
cloud_capacity_left = self.total_data_size - self.on_prem_capacity
print(f"on-prem capacity left: {human_readable_size(on_prem_capacity_left)}, "
f"cloud capacity left: {human_readable_size(cloud_capacity_left)}")
prob_on_prem = on_prem_capacity_left / (on_prem_capacity_left + cloud_capacity_left)
placed_tables = self.placement.keys()
for table, size in self.table_size_lookup.items():
if table in placed_tables:
continue
if on_prem_capacity_left > size and cloud_capacity_left > size:
if random() < prob_on_prem:
self.placement[table] = Status.ONPREM
on_prem_capacity_left -= size
else:
self.placement[table] = Status.CLOUD
cloud_capacity_left -= size
elif on_prem_capacity_left > size:
self.placement[table] = Status.ONPREM
on_prem_capacity_left -= size
elif cloud_capacity_left > size:
self.placement[table] = Status.CLOUD
cloud_capacity_left -= size
else:
# print(f"Table {table} ({human_readable_size(size)}) is larger than remaining capacity "
# f"on-prem ({human_readable_size(on_prem_capacity_left)}) "
# f"and cloud ({human_readable_size(cloud_capacity_left)})")
self.placement[table] = Status.CLOUD
cloud_capacity_left -= size
self.on_prem_data_size = self.on_prem_capacity - on_prem_capacity_left
print(f"on-prem data size: {human_readable_size(self.on_prem_data_size)}")
def get_moirai_job_distribution(self):
Moirai_path = f"baselines/query_placement_c{self.cloud_target}.csv"
if not os.path.exists(Moirai_path):
raise FileNotFoundError(f"Path {Moirai_path} not found")
# header: abFP,y
job_dist = pd.read_csv(Moirai_path)
placement_map = dict(zip(job_dist['abFP'], job_dist['y']))
df = self.workload.copy()
df['Status'] = df['abstractFingerPrint'].map(placement_map)
df = df[df['Status'].notnull()]
return df
def get_random_project_distribution(self):
assert os.path.exists("yugongTraces/report-uown-volume-table-20241022-20241028.csv")
# header: abstractFingerPrint, db_name, table_name, inputDataSize, outputDataSize, cputime
df = pd.read_csv("yugongTraces/report-uown-volume-table-20241022-20241028.csv")
prob_in_cloud = self.cloud_target / 100
# Get unique abstractFingerPrint values
unique_abFPs = df['abstractFingerPrint'].unique()
total_cputime = df['cputime'].sum()
cloud_quota = total_cputime * prob_in_cloud
onprem_quota = total_cputime * (1 - prob_in_cloud)
shuffled_abFPs = np.random.permutation(unique_abFPs)
abFP_status_map = {}
for abFP in shuffled_abFPs:
cputime = df[df['abstractFingerPrint'] == abFP]['cputime'].sum()
if cputime <= cloud_quota:
abFP_status_map[abFP] = 1
cloud_quota -= cputime
else:
abFP_status_map[abFP] = 0
onprem_quota -= cputime
# # Assign Status randomly with probability prob_in_cloud for being 1 (in cloud)
# abFP_status_map = {abFP: np.random.choice([0, 1], p=[1 - prob_in_cloud, prob_in_cloud]) for abFP in
# unique_abFPs}
# Map the assigned Status to the dataframe
df['Status'] = df['abstractFingerPrint'].map(abFP_status_map)
return df
def data_placement_by_compute_distribution(self, df):
df['access_size'] = df['inputDataSize'] + df['outputDataSize']
grouped = df.groupby(['db_name', 'table_name', 'Status'], as_index=False)['access_size'].sum()
grouped_map = {}
for _, row in grouped.iterrows():
key = f"{row['db_name']}.{row['table_name']}"
grouped_map.setdefault(key, {Status.ONPREM: 0, Status.CLOUD: 0})
if row['Status'] == 0:
grouped_map[key][Status.ONPREM] = row['access_size']
else:
grouped_map[key][Status.CLOUD] = row['access_size']
onprem_size = 0
cloud_size = 0
for table_key, traffic in grouped_map.items():
onprem_traffic = traffic[Status.ONPREM]
cloud_traffic = traffic[Status.CLOUD]
table_size = self.table_size_lookup.get(table_key, 0)
if table_key in self.placement or table_size == 0:
continue
if onprem_traffic > cloud_traffic and self.on_prem_data_size + onprem_size + table_size <= self.on_prem_capacity:
self.placement[table_key] = Status.ONPREM
onprem_size += table_size
else:
self.placement[table_key] = Status.CLOUD
cloud_size += table_size
print(f"on-prem new data size: {human_readable_size(onprem_size)}, "
f"cloud new data size: {human_readable_size(cloud_size)}")
self.on_prem_data_size += onprem_size
print(f"on-prem data size: {human_readable_size(self.on_prem_data_size)}")
decisions = self.placement.keys()
for table, table_size in self.table_size_lookup.items():
if table not in decisions:
if table_size + self.on_prem_data_size <= self.on_prem_capacity:
self.placement[table] = Status.ONPREM
self.on_prem_data_size += table_size
else:
self.placement[table] = Status.CLOUD
cloud_size += table_size
print(f"on-prem data size: {human_readable_size(self.on_prem_data_size)}")
print(f"cloud data size: {human_readable_size(cloud_size)}")
print(f"total data size: {human_readable_size(self.on_prem_data_size + cloud_size)}")
print(f"=====================")
# print(f"# of tables: {len(self.placement)}")
# total_data_size = 0
# onprem_size = 0
# rep_size = 0
# cloud_size = 0
# for table, status in self.placement.items():
# total_data_size += self.table_size_lookup.get(table, 0)
# if status == Status.ONPREM:
# onprem_size += self.table_size_lookup.get(table, 0)
# elif status == Status.REP:
# rep_size += self.table_size_lookup.get(table, 0)
# elif status == Status.CLOUD:
# cloud_size += self.table_size_lookup.get(table, 0)
# print("Sanity Check:")
# print(f"on-prem data size: {human_readable_size(onprem_size)}")
# print(f"replicated data size: {human_readable_size(rep_size)}")
# print(f"cloud data size: {human_readable_size(cloud_size)}")
# print(f"total data size: {human_readable_size(total_data_size)}")
def persist_placement(self):
with open(os.path.join(self.my_dir, "dataset_placement.csv"), "w") as f:
f.write("table,z,w,size\n")
for table, status in self.placement.items():
if status == Status.ONPREM:
z, w = 0, 1
elif status == Status.CLOUD:
z, w = 1, 0
else:
z, w = 0, 0
size = self.table_size_lookup.get(table, None)
if size is None:
#print(f"Table {table} not found in table_size_lookup")
continue
f.write(f"{table},{z},{w},{size/1024**3}\n")
if __name__ == "__main__":
for c in [30, 50, 70]:
_ = baselines("rep_x_month", c, 0.21)
for c in [30, 50, 70]:
for rep_rate in [0, 0.025]: # 0.002, 0.01, 0.025, 0.05
_ = baselines("rep_rtd", c, rep_rate)
for c in [30, 50, 70]:
_ = baselines("MoiJob", c, 0.002)
for _ in range(6):
for c in [30, 50, 70]:
for rep_rate in [0]: # , 0.002, 0.01, 0.025, 0.05
_ = baselines("volley_new", c, rep_rate)
# baseline = baselines(args.baseline, args.c, args.rep_rate)