def process_jobs()

in scheduler.py [0:0]


def process_jobs(c, num_of_week, dir_path, debug, policy, traffic_rate_disabled=False):
    # create c30 or c50 or c70 directory under parent to store traffic_rate
    if os.path.exists(os.path.join(dir_path, f"c{c}")) and not traffic_rate_disabled:
        shutil.rmtree(os.path.join(dir_path, f"c{c}"), ignore_errors=False)
    os.makedirs(f"{dir_path}/c{c}", exist_ok=True)
    period_day = 7

    setup_logger(os.path.join(dir_path, f'routing_c{c}_{policy}.txt'))
    logging.info(f"Start processing jobs with c={c}, num_of_week={num_of_week}, dir_path={dir_path}, debug={debug}, policy={policy}, traffic_rate_disabled={traffic_rate_disabled}")
    # prepare scheduler
    period_start = datetime.strptime("2024-10-22", "%Y-%m-%d")
    logging.info(f"Preparing the first df starting from {period_start}")
    # Header: start_time: str,job_id,template_id,duration,
    # uown_names,inputDataSize,outputDataSize,cputime, type
    df_presto = pd.concat([read_Presto(period_start + timedelta(days=i)) for i in range(period_day)])
    df_spark = pd.concat([read_Spark(period_start + timedelta(days=i)) for i in range(period_day)])
    df = pd.concat([df_spark, df_presto])
    df['totalDataSize'] = df['inputDataSize'] + df['outputDataSize']
    # df = df.sort_values(['datetime', 'job_id'])
    weight_group = df.groupby(['table']).agg(
            totalDataSize=('totalDataSize', 'mean')).reset_index()
    weight_lookup = weight_group.set_index('table').to_dict()['totalDataSize']

    logging.info(f"# of jobs: {len(df['job_id'].unique())}")

    period_start = period_start + timedelta(days=period_day)

    """ to calculate traffic rate per minute, """
    minute_buckets = OrderedDict()  # OrderedDict keeps minute order for easy popping

    # store logs for each period
    period_logs = []

    for period_offset in range(num_of_week):
        start_date = period_start + timedelta(days=period_offset * period_day)
        if period_offset == 0:
            label = ""
        else:
            label = "_" + (start_date - timedelta(days=period_day)).strftime("%m%d")

        # Header: start_time,job_id,template_id,duration,
        # uown_names,inputDataSize,cputime, type
        # TODO: this can be parallelized
        df_presto = pd.concat([read_Presto(start_date + timedelta(days=i)) for i in range(period_day)])
        df_spark = pd.concat([read_Spark(start_date + timedelta(days=i)) for i in range(period_day)])

        df = pd.concat([df_spark, df_presto])
        # df["datetime"] = df.apply(
        #     lambda row: datetime.strptime(row["start_time"], "%Y-%m-%d %H:%M:%S.%f")
        #     if row["type"] == JobType.SPARK
        #     else datetime.strptime(row["start_time"], "%Y-%m-%d %H:%M:%S"),
        #     axis=1
        # )
        df['totalDataSize'] = df['inputDataSize'] + df['outputDataSize']
        df = df.sort_values(['start_time', 'job_id'])
        print("first 5 jobs", df.head())
        logging.info(f"Week {period_offset + 1}, starting on {start_date}")
        logging.info(f"# of jobs: {len(df['job_id'].unique())}")


        if debug:
            print(f"debug mode: retain first 3K rows", flush=True)
            # retain first 3K rows
            jobs = df.head(3000).groupby(['start_time', 'job_id'])
            print(jobs.head(1))
        else:
            jobs = df.groupby(['start_time', 'job_id'])

        # prepare scheduler with optimization results
        scheduler = Scheduler(dir_path=os.path.join(dir_path, f"test_run_c{c}_bw0.02_local{100-c}{label}"),
                              table_size_path='report-table-size-0907.csv' if start_date < datetime.strptime("2024-05-13", "%Y-%m-%d") else 'report-table-size-20241021.csv',
                              weight_lookup=weight_lookup) # TODO: stateful between periods
        #logging.info(f"hit rate: {scheduler.query_map.hit_rate()}")

        egress_byte_Presto = 0
        ingress_byte_Presto = 0
        egress_byte_Spark = 0
        ingress_byte_Spark = 0

        hybrid_job_count = 0
        hybrid_job_bytes = 0

        # enumerate jobs
        for (start_time, job_id), group in jobs:
            job_type = group['type'].iloc[0]
            if job_type == JobType.SPARK:
                cputime = group['cputime'].iloc[0]
            else:
                cputime = group['cputime'].sum()
            template_id = group['template_id'].iloc[0]
            table_volume_list = [(row['table'], row['inputDataSize'], row['outputDataSize']) for _, row in group.iterrows()]

            placement_y, egress_byte, ingress_byte = scheduler.place_query(template_id, cputime, table_volume_list,
                                                         policy=policy,
                                                         target_cloud_cpu_ratio=c / 100,
                                                         info=start_time)

            if job_type == JobType.SPARK:
                egress_byte_Spark += egress_byte
                ingress_byte_Spark += ingress_byte
            else:
                egress_byte_Presto += egress_byte
                ingress_byte_Presto += ingress_byte

            if egress_byte + ingress_byte > 0:
                hybrid_job_count += 1
                hybrid_job_bytes += sum([input + output for _, input, output in table_volume_list])

            if not traffic_rate_disabled:
                """ traffic rate """
                duration = group['duration'].iloc[0]
                if job_type == JobType.SPARK:
                    tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f")
                else:
                    tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
                tEnd = tStart + timedelta(seconds=duration)
                start_minute = tStart.replace(second=0, microsecond=0)
                end_minute = (tEnd + timedelta(seconds=59)).replace(second=0, microsecond=0)

                # Flush expired minute buckets (older than job_start_minute)
                # TODO: we can not flush as 'start_time' (str) is not the correct index
                # flush_oldest_minute_buckets(minute_buckets, start_minute, os.path.join(dir_path, f"c{c}"))

                min = start_minute
                total_minute = (end_minute - start_minute).total_seconds() / 60
                while min < end_minute:
                    if min not in minute_buckets:
                        minute_buckets[min] = {'egress_byte_Presto': 0, 'ingress_byte_Presto': 0,
                                               'egress_byte_Spark': 0, 'ingress_byte_Spark': 0}
                        # minute_buckets[min] = {'egress_byte': 0, 'ingress_byte': 0}
                    if job_type == JobType.SPARK:
                        minute_buckets[min]['egress_byte_Spark'] += egress_byte / total_minute
                        minute_buckets[min]['ingress_byte_Spark'] += ingress_byte / total_minute
                    else:
                        minute_buckets[min]['egress_byte_Presto'] += egress_byte / total_minute
                        minute_buckets[min]['ingress_byte_Presto'] += ingress_byte / total_minute
                    # minute_buckets[min]['egress_byte'] += egress_byte / total_minute
                    # minute_buckets[min]['ingress_byte'] += ingress_byte / total_minute
                    min += timedelta(minutes=1)

        new_weight_group = df.groupby(['table']).agg(
            totalDataSize=('totalDataSize', 'mean')).reset_index()
        new_weight_lookup = new_weight_group.set_index('table').to_dict()['totalDataSize']
        weight_lookup.update(new_weight_lookup)

        logging.info(f"Egress {human_readable_size(egress_byte_Presto + egress_byte_Spark)}: "
                     f"Presto {human_readable_size(egress_byte_Presto)}, Spark {human_readable_size(egress_byte_Spark)}")
        logging.info(f"Ingress {human_readable_size(ingress_byte_Presto + ingress_byte_Spark)}: "
                     f"Presto {human_readable_size(ingress_byte_Presto)}, Spark {human_readable_size(ingress_byte_Spark)}")
        logging.info(f"# of hybrid jobs: {hybrid_job_count} with access bytes: {human_readable_size(hybrid_job_bytes)}")
        #logging.info(f"hit rate: {scheduler.query_map.hit_rate()}")

        # Log period statistics
        period_logs.append({
            "start_date": start_date,
            "end_date": start_date + timedelta(days=period_day-1),
            "scheduling_policy": policy,
            "c": c,
            "cloud_compute_ratio": scheduler.get_cloud_computation_ratio(),  # Store only the ratio
            "egress_byte_Presto": egress_byte_Presto,
            "ingress_byte_Presto": ingress_byte_Presto,
            "egress_byte_Spark": egress_byte_Spark,
            "ingress_byte_Spark": ingress_byte_Spark,
            "dir_path": dir_path,
            "opt_dir_path": os.path.join(dir_path, f"test_run_c{c}_bw0.02_local{100-c}{label}")
        })

    if not traffic_rate_disabled:
        # Flush remaining minute buckets
        flush_oldest_minute_buckets(minute_buckets, None, os.path.join(dir_path, f"c{c}"))

    # Now log all stored period statistics in a single batch
    for log_entry in period_logs:
        log_period_statistics(
            log_entry["start_date"],
            log_entry["end_date"],
            log_entry["scheduling_policy"],
            log_entry["c"],
            log_entry["cloud_compute_ratio"],  # Only store ratio instead of full scheduler object
            log_entry["egress_byte_Presto"],
            log_entry["ingress_byte_Presto"],
            log_entry["egress_byte_Spark"],
            log_entry["ingress_byte_Spark"],
            # log_entry["egress_byte"],
            # log_entry["ingress_byte"],
            log_entry["dir_path"],
            log_entry["opt_dir_path"],
            traffic_rate_disabled=traffic_rate_disabled
        )