in scheduler.py [0:0]
def log_period_statistics(start_date: datetime, end_date: datetime, scheduling_policy: str, c: int,
cloud_compute_rate: float,
egress_byte_Presto, ingress_byte_Presto,
egress_byte_Spark, ingress_byte_Spark,
dir_path, opt_dir_path, traffic_rate_disabled=False,
rep_rate=None
):
"""Logs summarized statistics for each period"""
period_str = f"{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')}"
cloud_computation_rate_str = f"{cloud_compute_rate*100:.2f}%" # Get actual ratio from scheduler
cloud_computation_target = c
# Extract movement bytes
if opt_dir_path is None:
movement_ingress = movement_egress = 0
rep_size = rep_rate * parse_size("299.12PB")
sample_rate = 1.0
else:
movement_ingress, movement_egress, rep_size, sample_rate = extract_movement_rep_and_sample(os.path.join(opt_dir_path, "log.txt"))
if not traffic_rate_disabled:
# Compute traffic percentiles
if rep_rate is not None:
print("Evaluating baselines")
traffic_dir = dir_path
else:
traffic_dir = os.path.join(dir_path, f"c{c}")
p90, p95, p99 = calculate_traffic_percentiles(traffic_dir, start_date, end_date)
else:
p90 = p95 = p99 = None
# Format log line
log_entry = (f"{period_str},{scheduling_policy},{cloud_computation_rate_str},{cloud_computation_target},"
f"{ingress_byte_Presto},{egress_byte_Presto},"
f"{ingress_byte_Spark},{egress_byte_Spark},"
f"{p90},{p95},{p99},{movement_ingress},{movement_egress},"
f"{rep_size},{sample_rate}\n")
log_file = os.path.join(dir_path, f"log.csv")
write_header = not os.path.exists(log_file) # Write header if file does not exist
try:
with open(log_file, "a") as f:
fcntl.flock(f, fcntl.LOCK_EX) # Lock file to prevent interference
if write_header:
f.write(
"period,mode,cloud_computation_ratio,cloud_computation_target,"
"ingress_byte_Presto,egress_byte_Presto,"
"ingress_byte_Spark,egress_byte_Spark,"
"P90_traffic_bps,P95_traffic_bps,P99_traffic_bps,"
"movement_ingress_bytes,movement_egress_bytes,rep_bytes,sample_rate\n")
f.write(log_entry)
fcntl.flock(f, fcntl.LOCK_UN) # Unlock file after writing
logging.info(f"Logged period statistics to log.csv: {log_entry.strip()}")
except Exception as e:
logging.error(f"Error writing to log.csv: {e}")