in tests.py [0:0]
def test_sample_k(sample_rate: float, compute_on_cloud_pct: int = 30, test_Spark: bool = True,
rep_budget_rate: float = 0.004, rep_strategy: str = "job_access_density",
num_weeks: int = 2
):
"""
Given sample ratio, compute on cloud (%), avg bandwidth usage ratio of 800Gbps, and replication budget of total data
Parameters:
- sample_rate (float): Sample rate of top cost-sensitive jobs.
- compute_on_cloud (int): Percentage of resources allocated (suggested to be in [30, 50, 70]).
- test_Spark (bool): If True, use Spark traces from 2024-2025 (>100 days) along with Presto traces in the same period.
If False, use Presto traces from 2023-2024 (>200 days).
- rep_budget (float): Replication budget constraint (percentage of total data)
- rep_strategy (str): Selection strategy in pre-selecting process (default: "job_access_density")
Functionality:
1. Validates input parameters.
2. Sets up output directories.
3. Initializes optimization parameters.
4. Iterates through weekly data and solves the optimization problem.
"""
try:
# Validate input
assert compute_on_cloud_pct in [30, 50, 70], "compute_on_cloud must be one of [30, 50, 70]"
# Set up parameters (not expected to change)
# - avg_bw_usage (float): Fraction of network bandwidth dedicated to Moirai on average.
avg_bw_usage_ratio = 0.02 # empirical value
# Set up directories
output_dir = f"sample_{sample_rate:.3f}"
os.makedirs(output_dir, exist_ok=True)
# Redirect stdout to a file
original_stdout = sys.stdout
if rep_strategy != "job_access_density":
sys.stdout = open(f"{output_dir}/log_c{compute_on_cloud_pct}_{rep_strategy}.txt", "a")
else:
sys.stdout = open(f"{output_dir}/log_c{compute_on_cloud_pct}.txt", "a")
print(f"Time: {datetime.now()}", flush=True)
reserved_bandwidth_gb = avg_bw_usage_ratio * network_capacity_gb
# compute placement and storage constraints
# For example, if compute_on_cloud_pct = 30, then compute_onprem [0.65, 0.7] and storage_on_prem [0.65, 0.7]
compute_cloud_min, compute_cloud_max = compute_on_cloud_pct / 100, compute_on_cloud_pct / 100 + 0.05
storage_on_prem_min, storage_on_prem_max = 1 - compute_on_cloud_pct / 100 - 0.05, 1 - compute_on_cloud_pct / 100
# Initialize graph
if rep_strategy != "job_access_density":
base_path = f"{output_dir}/test_run_c{compute_on_cloud_pct}_bw{avg_bw_usage_ratio:.2f}_local{100 - compute_on_cloud_pct}_{rep_strategy}"
else:
base_path = f"{output_dir}/test_run_c{compute_on_cloud_pct}_bw{avg_bw_usage_ratio:.2f}_local{100 - compute_on_cloud_pct}"
last_dir = base_path # Track last processed directory
# Initialize graph if not in view mode (i.e., not just printing the path for sanity check)
view_mode = args.view
graph = None
if test_Spark:
job_data_access_df, workload_print_info = prepare_df(datetime.strptime("2024-10-22", "%Y-%m-%d"),
datetime.strptime("2024-10-28", "%Y-%m-%d"),
Presto=True, Spark=True)
else:
job_data_access_df, workload_print_info = prepare_df(datetime.strptime("2023-09-08", "%Y-%m-%d"),
datetime.strptime("2023-09-14", "%Y-%m-%d"),
Presto=True, Spark=False)
if not view_mode:
graph = Query_on_DB_Table(
job_data_access_df,
workload_print_info,
'report-table-size-0907.csv' if not test_Spark else 'report-table-size-20241021.csv',
rep_threshold=rep_budget_rate, # optimizer will figure out the actual budget based on the data
rep_strategy=rep_strategy,
k=sample_rate,
log_dir=output_dir
)
# Run the first optimization if not already completed
if not os.path.exists(base_path):
graph.solve_gurobi(
egress_gb, storage_gb_week, compute_cloud_min, compute_cloud_max, reserved_bandwidth_gb,
base_path, storage_on_prem_min, storage_on_prem_max, True,
alpha=1, time_limit=30 * 24 * 60 * 60, # 30 days
p_network_gb=p_network_gb * 5, # TODO: Hard-coded now
)
# Verify the placement file
placement_file = os.path.join(base_path, "dataset_placement.csv")
assert os.path.exists(placement_file), f"File not found: {placement_file}"
previous_placement = placement_file
# Define dynamic date-based traces processing
if test_Spark:
period_start = datetime.strptime("2024-10-29", "%Y-%m-%d")
else:
period_start = datetime.strptime("2023-09-15", "%Y-%m-%d") # Start date
# num_weeks = args.num_week # Number weekly iterations
for week_offset in range(num_weeks):
start_date = period_start + timedelta(weeks=week_offset)
end_date = start_date + timedelta(days=6)
label = start_date.strftime("%m%d")
if rep_strategy != "job_access_density":
output_path = f"{output_dir}/test_run_c{compute_on_cloud_pct}_bw{avg_bw_usage_ratio:.2f}_local{100 - compute_on_cloud_pct}_{rep_strategy}_{label}"
else:
output_path = f"{output_dir}/test_run_c{compute_on_cloud_pct}_bw{avg_bw_usage_ratio:.2f}_local{100 - compute_on_cloud_pct}_{label}"
if os.path.exists(output_path):
previous_placement = os.path.join(output_path, "dataset_placement.csv")
print(f"Skip {output_path}")
continue
print(f"Previous placement: {previous_placement}", flush=True)
job_data_access_df, workload_print_info = prepare_df(start_date, end_date, Presto=True, Spark=test_Spark)
if not view_mode:
# Restore database table states from previous placement
graph.restore_unique_db_tables(previous_placement, log_dir=last_dir)
# Update the workload with the new access trace
graph.update_workload(job_data_access_df, workload_print_info, log_dir=last_dir)
# Update the previous placement
graph.update_previous_placement(previous_placement)
# Optimization parameters
alpha = 1 # the degree of penalty for table switch
print(f"Running optimization for week starting on {label}")
print("----------------------------------------")
print(f"Inputs: days=7, egress_gb={egress_gb}, storage_gb_week={storage_gb_week}, "
f"compute_cloud_min={compute_cloud_min}, compute_cloud_max={compute_cloud_max}, "
f"network_cap_gb={reserved_bandwidth_gb}, "
f"storage_on_prem_min={storage_on_prem_min}, storage_on_prem_max={storage_on_prem_max}")
print(f"penalty degree alpha={alpha}")
print("----------------------------------------", flush=True)
# Solve optimization problem for this period
if not view_mode:
graph.solve_gurobi(
egress_gb, storage_gb_week, compute_cloud_min, compute_cloud_max, reserved_bandwidth_gb,
output_path, storage_on_prem_min, storage_on_prem_max, True,
alpha=alpha, time_limit=24 * 60 * 60,
p_network_gb=p_network_gb * 5, # TODO: Hard-coded now
)
# Update the previous placement for the next iteration
previous_placement = os.path.join(output_path, "dataset_placement.csv")
last_dir = output_path
# Close the log file
sys.stdout.close()
sys.stdout = original_stdout
except Exception as e:
print(f"Error in test_sample_k with sample_rate={sample_rate}, compute_on_cloud_pct={compute_on_cloud_pct}")
print("Exception traceback:")
print(traceback.format_exc())
raise