in tests.py [0:0]
def test_reorganization_cost_unaware(test_Spark: bool = True, view_mode: bool = False):
"""
Baseline: reorganization cost unaware
Run optimization separately from 10% to 90% compute on cloud in 10% increments
Args:
test_Spark: If True, use Spark jobs in addition to Presto jobs
If False, use only Presto jobs
"""
try:
# Set up parameters
avg_bw_usage_ratio = 0.02 # empirical value
sample_rate = 1
rep_budget_rate = 0.004 # empirical value
alpha = 0.25 # assuming 10% change in a month (still agressive)
# Set up directories
output_dir = f"long_term"
os.makedirs(output_dir, exist_ok=True)
# Redirect stdout to a file
original_stdout = sys.stdout
sys.stdout = open(f"{output_dir}/log_unaware.txt", "a")
print(f"Time: {datetime.now()}", flush=True)
reserved_bandwidth_gb = avg_bw_usage_ratio * network_capacity_gb
if test_Spark:
job_data_access_df, workload_print_info = prepare_df(datetime.strptime("2024-10-22", "%Y-%m-%d"),
datetime.strptime("2024-10-28", "%Y-%m-%d"),
Presto=True, Spark=True)
else:
job_data_access_df, workload_print_info = prepare_df(datetime.strptime("2023-09-08", "%Y-%m-%d"),
datetime.strptime("2023-09-14", "%Y-%m-%d"),
Presto=True, Spark=False)
for compute_on_cloud_pct in range(10, 100, 10):
compute_cloud_min, compute_cloud_max = compute_on_cloud_pct / 100, compute_on_cloud_pct / 100 + 0.05
storage_on_prem_min, storage_on_prem_max = 1 - compute_on_cloud_pct / 100 - 0.05, 1 - compute_on_cloud_pct / 100
# Initialize graph
base_path = f"{output_dir}/test_run_c{compute_on_cloud_pct}_bw{avg_bw_usage_ratio:.2f}_local{100 - compute_on_cloud_pct}"
if os.path.exists(base_path):
print(f"Skip {base_path}")
continue
print(f"Running optimization for {compute_on_cloud_pct}%")
print("----------------------------------------")
print(f"Inputs: days=7, egress_gb={egress_gb}, storage_gb_week={storage_gb_week}, "
f"compute_cloud_min={compute_cloud_min}, compute_cloud_max={compute_cloud_max}, "
f"network_cap_gb={reserved_bandwidth_gb}, "
f"storage_on_prem_min={storage_on_prem_min}, storage_on_prem_max={storage_on_prem_max}")
print(f"penalty degree alpha={alpha}")
print("----------------------------------------", flush=True)
if not view_mode:
graph = Query_on_DB_Table(
job_data_access_df,
workload_print_info,
'report-table-size-0907.csv' if not test_Spark else 'report-table-size-20241021.csv',
rep_threshold=rep_budget_rate, # optimizer will figure out the actual budget based on the data
k=sample_rate,
log_dir=output_dir
)
graph.solve_gurobi(
egress_gb, storage_gb_week, compute_cloud_min, compute_cloud_max, reserved_bandwidth_gb,
base_path, storage_on_prem_min, storage_on_prem_max, True,
alpha=alpha, time_limit=24 * 60 * 60, # 24 hours
p_network_gb=p_network_gb * 5, # TODO: Hard-coded now
)
# close the log file
sys.stdout.close()
sys.stdout = original_stdout
except Exception as e:
print(f"Error in test_long_term_effect")
print("Exception traceback:")
print(traceback.format_exc())
raise