scheduler.py [504:542]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    minute_buckets = OrderedDict()  # OrderedDict keeps minute order for easy popping

    # store logs for each period
    period_logs = []

    for period_offset in range(num_of_week):
        start_date = period_start + timedelta(days=period_offset * period_day)
        if period_offset == 0:
            label = ""
        else:
            label = "_" + (start_date - timedelta(days=period_day)).strftime("%m%d")

        # Header: start_time,job_id,template_id,duration,
        # uown_names,inputDataSize,cputime, type
        # TODO: this can be parallelized
        df_presto = pd.concat([read_Presto(start_date + timedelta(days=i)) for i in range(period_day)])
        df_spark = pd.concat([read_Spark(start_date + timedelta(days=i)) for i in range(period_day)])

        df = pd.concat([df_spark, df_presto])
        # df["datetime"] = df.apply(
        #     lambda row: datetime.strptime(row["start_time"], "%Y-%m-%d %H:%M:%S.%f")
        #     if row["type"] == JobType.SPARK
        #     else datetime.strptime(row["start_time"], "%Y-%m-%d %H:%M:%S"),
        #     axis=1
        # )
        df['totalDataSize'] = df['inputDataSize'] + df['outputDataSize']
        df = df.sort_values(['start_time', 'job_id'])
        print("first 5 jobs", df.head())
        logging.info(f"Week {period_offset + 1}, starting on {start_date}")
        logging.info(f"# of jobs: {len(df['job_id'].unique())}")


        if debug:
            print(f"debug mode: retain first 3K rows", flush=True)
            # retain first 3K rows
            jobs = df.head(3000).groupby(['start_time', 'job_id'])
            print(jobs.head(1))
        else:
            jobs = df.groupby(['start_time', 'job_id'])
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



scheduler.py [923:955]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    minute_buckets = OrderedDict()  # OrderedDict keeps minute order for easy popping

    # store logs for each period
    period_logs = []

    for period_offset in range(num_of_week):
        start_date = period_start + timedelta(days=period_offset * period_day)
        if period_offset == 0:
            label = ""
        else:
            label = "_" + (start_date - timedelta(days=period_day)).strftime("%m%d")

        # Header: start_time,job_id,template_id,duration,
        # uown_names,inputDataSize,cputime, type
        # TODO: this can be parallelized
        df_presto = pd.concat([read_Presto(start_date + timedelta(days=i)) for i in range(period_day)])
        df_spark = pd.concat([read_Spark(start_date + timedelta(days=i)) for i in range(period_day)])

        df = pd.concat([df_spark, df_presto])
        df['totalDataSize'] = df['inputDataSize'] + df['outputDataSize']
        df = df.sort_values(['start_time', 'job_id'])
        print("first 5 jobs", df.head())
        logging.info(f"Week {period_offset + 1}, starting on {start_date}")
        logging.info(f"# of jobs: {len(df['job_id'].unique())}")


        if debug:
            print(f"debug mode: retain first 3K rows", flush=True)
            # retain first 3K rows
            jobs = df.head(3000).groupby(['start_time', 'job_id'])
            print(jobs.head(1))
        else:
            jobs = df.groupby(['start_time', 'job_id'])
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



