scheduler.py [393:433]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            if not traffic_rate_disabled:
                """ traffic rate """
                duration = group['duration'].iloc[0]
                if job_type == JobType.SPARK:
                    tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f")
                else:
                    tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
                tEnd = tStart + timedelta(seconds=duration)
                start_minute = tStart.replace(second=0, microsecond=0)
                end_minute = (tEnd + timedelta(seconds=59)).replace(second=0, microsecond=0)

                # Flush expired minute buckets (older than job_start_minute)
                # TODO: we can not flush as 'start_time' (str) is not the correct index
                # flush_oldest_minute_buckets(minute_buckets, start_minute, os.path.join(dir_path, f"c{c}"))

                min = start_minute
                total_minute = (end_minute - start_minute).total_seconds() / 60
                while min < end_minute:
                    if min not in minute_buckets:
                        minute_buckets[min] = {'egress_byte_Presto': 0, 'ingress_byte_Presto': 0,
                                               'egress_byte_Spark': 0, 'ingress_byte_Spark': 0}
                        # minute_buckets[min] = {'egress_byte': 0, 'ingress_byte': 0}
                    if job_type == JobType.SPARK:
                        minute_buckets[min]['egress_byte_Spark'] += egress_byte / total_minute
                        minute_buckets[min]['ingress_byte_Spark'] += ingress_byte / total_minute
                    else:
                        minute_buckets[min]['egress_byte_Presto'] += egress_byte / total_minute
                        minute_buckets[min]['ingress_byte_Presto'] += ingress_byte / total_minute
                    # minute_buckets[min]['egress_byte'] += egress_byte / total_minute
                    # minute_buckets[min]['ingress_byte'] += ingress_byte / total_minute
                    min += timedelta(minutes=1)

        new_weight_group = df.groupby(['table']).agg(
            totalDataSize=('totalDataSize', 'mean')).reset_index()
        new_weight_lookup = new_weight_group.set_index('table').to_dict()['totalDataSize']
        weight_lookup.update(new_weight_lookup)

        logging.info(f"Egress {human_readable_size(egress_byte_Presto + egress_byte_Spark)}: "
                     f"Presto {human_readable_size(egress_byte_Presto)}, Spark {human_readable_size(egress_byte_Spark)}")
        logging.info(f"Ingress {human_readable_size(ingress_byte_Presto + ingress_byte_Spark)}: "
                     f"Presto {human_readable_size(ingress_byte_Presto)}, Spark {human_readable_size(ingress_byte_Spark)}")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



scheduler.py [584:624]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            if not traffic_rate_disabled:
                """ traffic rate """
                duration = group['duration'].iloc[0]
                if job_type == JobType.SPARK:
                    tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f")
                else:
                    tStart = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
                tEnd = tStart + timedelta(seconds=duration)
                start_minute = tStart.replace(second=0, microsecond=0)
                end_minute = (tEnd + timedelta(seconds=59)).replace(second=0, microsecond=0)

                # Flush expired minute buckets (older than job_start_minute)
                # TODO: we can not flush as 'start_time' (str) is not the correct index
                # flush_oldest_minute_buckets(minute_buckets, start_minute, os.path.join(dir_path, f"c{c}"))

                min = start_minute
                total_minute = (end_minute - start_minute).total_seconds() / 60
                while min < end_minute:
                    if min not in minute_buckets:
                        minute_buckets[min] = {'egress_byte_Presto': 0, 'ingress_byte_Presto': 0,
                                               'egress_byte_Spark': 0, 'ingress_byte_Spark': 0}
                        # minute_buckets[min] = {'egress_byte': 0, 'ingress_byte': 0}
                    if job_type == JobType.SPARK:
                        minute_buckets[min]['egress_byte_Spark'] += egress_byte / total_minute
                        minute_buckets[min]['ingress_byte_Spark'] += ingress_byte / total_minute
                    else:
                        minute_buckets[min]['egress_byte_Presto'] += egress_byte / total_minute
                        minute_buckets[min]['ingress_byte_Presto'] += ingress_byte / total_minute
                    # minute_buckets[min]['egress_byte'] += egress_byte / total_minute
                    # minute_buckets[min]['ingress_byte'] += ingress_byte / total_minute
                    min += timedelta(minutes=1)

        new_weight_group = df.groupby(['table']).agg(
            totalDataSize=('totalDataSize', 'mean')).reset_index()
        new_weight_lookup = new_weight_group.set_index('table').to_dict()['totalDataSize']
        weight_lookup.update(new_weight_lookup)

        logging.info(f"Egress {human_readable_size(egress_byte_Presto + egress_byte_Spark)}: "
                     f"Presto {human_readable_size(egress_byte_Presto)}, Spark {human_readable_size(egress_byte_Spark)}")
        logging.info(f"Ingress {human_readable_size(ingress_byte_Presto + ingress_byte_Spark)}: "
                     f"Presto {human_readable_size(ingress_byte_Presto)}, Spark {human_readable_size(ingress_byte_Spark)}")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



