scripts/figs.py (778 lines of code) (raw):

import os from datetime import datetime, timedelta import numpy as np import pandas as pd from matplotlib import pyplot as plt from matplotlib.ticker import FuncFormatter from pandas import date_range from utility import human_readable_size, parse_size plt.rcParams.update({'font.size': 20}) plt.rcParams['pdf.fonttype'] = 42 plt.rcParams['ps.fonttype'] = 42 # Default colors for Matplotlib colors_default = plt.rcParams['axes.prop_cycle'].by_key()['color'] replication_cost_color = colors_default[0] # Blue egress_cost_color = colors_default[1] # Orange network_cost_color = colors_default[2] # Green egress_traffic_color = colors_default[5] # Red ingress_traffic_color = colors_default[4] # Purple # Hatching patterns hatch_patterns = ['/', '\\', '|', '-', '+', 'x', 'o', 'O', '.', '*'] # Add mode for scale scale_mode = "linear" font_size = 20 def overall_stats(df: pd.DataFrame, tag: str): """Compute overall statistics for a given dataset. Args: df (pd.DataFrame): Input dataframe with necessary columns. tag (str): Name identifier (e.g., "Moirai") for the results. Returns: pd.DataFrame: DataFrame containing computed metrics. """ df = df[df['mode'] == 'size-predict'].copy() # Compute total job ingress/egress volume df['job_ingress_bytes'] = df['ingress_byte_Presto'] + df['ingress_byte_Spark'] df['job_egress_bytes'] = df['egress_byte_Presto'] + df['egress_byte_Spark'] # Compute total ingress and egress volume (including movement) df['ingress_volume'] = df['job_ingress_bytes'] + df['movement_ingress_bytes'] df['egress_volume'] = df['job_egress_bytes'] + df['movement_egress_bytes'] df['traffic_volume'] = df['ingress_volume'] + df['egress_volume'] # Compute cost components df['ingress_volume_Spark'] = df['ingress_byte_Spark'] + df['movement_ingress_bytes'] df['ingress_volume_Presto'] = df['ingress_byte_Presto'] + df['movement_ingress_bytes'] df['egress_volume_Spark'] = df['egress_byte_Spark'] + df['movement_egress_bytes'] df['egress_volume_Presto'] = df['egress_byte_Presto'] + df['movement_egress_bytes'] df['cost'] = df['egress_volume_Presto'] / 1024 ** 3 * 0.02 + \ df['egress_volume_Spark'] / 1024 ** 3 * 0.02 + \ df['rep_bytes'] / 1024 ** 3 * 0.023 / 4 results = [] for c in df['cloud_computation_target'].unique(): df_c = df[df['cloud_computation_target'] == c] # Compute costs network_cost = df_c['P95_traffic_bps'].max() / (100 * 1024 ** 3) * 23.3 * 24 * 7 egress_cost_presto = df_c['egress_volume_Presto'].mean() / 1024 ** 3 * 0.02 egress_cost_spark = df_c['egress_volume_Spark'].mean() / 1024 ** 3 * 0.02 rep_cost = df_c['rep_bytes'].mean() / 1024 ** 3 * 0.023 / 4 # total_cost = network_cost + egress_cost_spark + egress_cost_presto + rep_cost total_cost = df_c['cost'].mean() + network_cost # Compute standard deviation for total cost (variance) total_cost_std = df_c['cost'].std() # Append results results.append({ "tag": tag, "c": c, "network_cost": network_cost, "egress_cost_Spark": egress_cost_spark, "egress_cost_Presto": egress_cost_presto, "egress_cost": egress_cost_spark + egress_cost_presto, "rep_cost": rep_cost, "total_cost": total_cost, "total_cost_std": total_cost_std, # Add standard deviation column "ingress_volume": df_c['ingress_volume'].mean() / 1024 ** 4, "egress_volume": df_c['egress_volume'].mean() / 1024 ** 4, }) return pd.DataFrame(results) def draw_overall_new(front: bool = False, job: bool = False, pr: bool = False): assert not front or not pr or not job, "Only one of front, job, and pr can be True" todo_dfs = [] # header: tag, c, network_cost, egress_cost_Spark, egress_cost_Presto, rep_cost, # ingress_volume_Spark, ingress_volume_Presto, egress_volume_Spark, egress_volume_Presto baseline_df = pd.read_csv(f'../baselines_done/log.csv') if front: suffix = "_front" todo_dfs.append(overall_stats(pd.read_csv(f'../sample_1.000_rep0.002/log.csv'), "Moirai\n(Our)")) todo_dfs.append(overall_stats(pd.read_csv(f'../yugong_results_rep0.000/log.csv'), "Yugong\n(Alibaba)")) todo_dfs.append(overall_stats(baseline_df[baseline_df['tag'] == "No\nRep"], "No Rep\n(Spotify)")) todo_dfs.append(overall_stats(baseline_df[baseline_df['tag'] == "3M\n21%"], "Rep 3Mon.\n(Twitter)")) elif job: suffix = "_job" todo_dfs.append(overall_stats(baseline_df[baseline_df['tag'] == "Volley\n2.5%"], "Volley\nRepTop2.5%")) todo_dfs.append(overall_stats(baseline_df[baseline_df['tag'] == "MoiJob\n0.2%"], "Moi-\nJobDist")) todo_dfs.append(overall_stats(pd.read_csv(f'../sample_1.000_rep0.002/log.csv'), "Moirai")) elif pr: suffix = "_pr" todo_dfs.append(overall_stats(baseline_df[baseline_df['tag'] == "MoiJob\n0.2%"], "Moi\nJobDist")) for rate in [0.001, 0.002, 0.004]: df = pd.read_csv(f'../sample_1.000_rep{rate:.3f}/log.csv') todo_dfs.append(overall_stats(df, f"Moi\nPR{rate * 100:.1f}%")) for sample_rate in [0.010, 0.050]: # 0.001, df = pd.read_csv(f'../sample_{sample_rate:.3f}/log.csv') todo_dfs.append(overall_stats(df, f"Moi\n{sample_rate * 100:.0f}%Job")) else: suffix = "" todo_dfs.append(overall_stats(pd.read_csv(f'../sample_1.000_rep0.002/log.csv'), "Moirai")) todo_dfs.append(overall_stats(pd.read_csv(f'../yugong_results_rep0.000/log.csv'), "Yugong")) todo_dfs.append(overall_stats(baseline_df[baseline_df['tag'] == "Volley\n0%"], "Volley")) todo_dfs.append(overall_stats(baseline_df[baseline_df['tag'] == "RTD\n2.5%"], "Rep\nTop2.5%")) todo_dfs.append(overall_stats(baseline_df[baseline_df['tag'] == "No\nRep"], "No\nRep")) todo_dfs.append(overall_stats(baseline_df[baseline_df['tag'] == "3M\n21%"], "Rep\n3Mon.")) #todo_dfs.append(overall_stats(baseline_df[baseline_df['tag'] == "Volley\n2.5%"], "Volley\nATD")) df = pd.concat(todo_dfs) df.to_csv('overall_stats_new.csv', index=False) for c in df['c'].unique(): if front: df_c = df[df['c'] == c].set_index('tag').loc[["No Rep\n(Spotify)", "Rep 3Mon.\n(Twitter)", "Yugong\n(Alibaba)", "Moirai\n(Our)"]] # Ensure order elif job: df_c = df[df['c'] == c].set_index('tag').loc[["Volley\nRepTop2.5%", "Moi-\nJobDist", "Moirai"]] elif pr: df_c = df[df['c'] == c].set_index('tag').loc[["Moi\nJobDist", "Moi\nPR0.1%", "Moi\nPR0.2%", "Moi\nPR0.4%", "Moi\n1%Job", "Moi\n5%Job"]] # "Volley\nATD", else: df_c = df[df['c'] == c].set_index('tag').loc[["No\nRep", "Volley", "Rep\n3Mon.", "Rep\nTop2.5%", "Yugong", "Moirai"]] # Ensure order "Volley\nATD", print(df_c) # Create subplots if front: fig, ax1 = plt.subplots(1, 1, figsize=(6, 4.5), constrained_layout=True) else: fig, (ax2, ax1) = plt.subplots(1, 2, figsize=(11, 4), constrained_layout=True) # ---- PLOT 1: Cost Breakdown ---- # df_costs = df_c[['egress_cost', 'rep_cost', 'network_cost']] # df_costs.set_index('tag', inplace=True) df_costs.plot(kind='bar', stacked=True, ax=ax1, color=[ replication_cost_color, egress_cost_color, network_cost_color]) # Apply hatch patterns to the bars for bar, hatch in zip(ax1.containers, hatch_patterns[:len(df_costs.columns)]): for patch in bar.patches: patch.set_hatch(hatch) # Add total sum as a single number on top of each bar for idx, rects in enumerate(zip(*ax1.containers)): # Stacked bars total_height = sum(rect.get_height() for rect in rects) if total_height > 0: ax1.text(rects[0].get_x() + rects[0].get_width() / 2, total_height, f'{total_height / 1000:.0f}K' if total_height < 1000000 else f'{total_height / 1000**2:.1f}M', ha='center', va='bottom', fontsize=font_size - 2, color='black') if not front and not pr and not job: for idx, tag in enumerate(["No\nRep", "Volley", "Rep\n3Mon.", "Rep\nTop2.5%", "Volley\nRepTop2.5%"]): if tag in df_costs.index: total_cost = df_c.loc[tag, "total_cost"] total_cost_std = df_c.loc[tag, "total_cost_std"] ax1.errorbar(x=idx, y=total_cost, yerr=total_cost_std, color='black', capsize=5, label="Std Dev" if idx == 0 else "") ax1.set_ylabel("Weekly Cost ($)", fontsize=font_size) ax1.set_xlabel(None) ax1.tick_params(rotation=0, labelsize=font_size - 2) if not front: ax1.set_xticklabels(df_costs.index, fontsize=font_size - 5, rotation=15) else: ax1.set_xticklabels(df_costs.index, fontsize=font_size - 3, rotation=0) if pr or job: yticks = [0, 20 * 1000, 40 * 1000, 60 * 1000, 80 * 1000, 100 * 1000, 120 * 1000] ytick_labels = ["0", "20K", "40K", "60K", "80K", "100K", "120K"] else: yticks = [0, 300 * 1000, 600 * 1000, 900 * 1000, 1200 * 1000, 1500 * 1000] ytick_labels = ["0", "300K", "600K", "900K", "1200K", "1500K"] ax1.set_yticks(yticks) ax1.set_yticklabels(ytick_labels, fontsize=font_size - 2) if c == 30 or front: ax1.legend(["Egress", "Replication", "Network"], fontsize=font_size - 2, ncol=1) # , loc='upper center' else: ax1.get_legend().remove() ax1.grid(axis='y') # ---- PLOT 2: Traffic Breakdown ---- # if not front: df_traffic = df_c[ ['ingress_volume', 'egress_volume']] # df_traffic.set_index('tag', inplace=True) df_traffic.plot(kind='bar', stacked=True, ax=ax2, color=[ ingress_traffic_color, egress_traffic_color]) # Apply hatch patterns to the traffic bars for bar, hatch in zip(ax2.containers, hatch_patterns[len(df_costs.columns):]): for patch in bar.patches: patch.set_hatch(hatch) # Add total sum as a single number on top of each bar for idx, rects in enumerate(zip(*ax2.containers)): # Stacked bars total_height = sum(rect.get_height() for rect in rects) if total_height > 1024: ax2.text(rects[0].get_x() + rects[0].get_width() / 2, total_height, f'{total_height / 1024:.1f}PB', ha='center', va='bottom', fontsize=font_size - 6, color='black') else: ax2.text(rects[0].get_x() + rects[0].get_width() / 2, total_height, f'{total_height:.0f}TB', ha='center', va='bottom', fontsize=font_size - 6, color='black') ax2.set_ylabel("Weekly Traffic", fontsize=font_size) ax2.set_xlabel(None) ax2.tick_params(rotation=0, labelsize=font_size - 2) ax2.set_xticklabels(df_costs.index, fontsize=font_size - 5, rotation=15) if pr or job: yticks = [0, 2 * 1024, 4 * 1024, 6 * 1024, 8 * 1024] ytick_labels = ["0", "2PB", "4PB", "6PB", "8PB"] else: yticks = [0, 30 * 1024, 60 * 1024, 90 * 1024, 120 * 1024] ytick_labels = ["0", "30PB", "60PB", "90PB", "120PB"] ax2.set_yticks(yticks) ax2.set_yticklabels(ytick_labels, fontsize=font_size - 2) if c == 30: ax2.legend(["Ingress Volume", "Egress Volume"], fontsize=font_size - 3, ncol=1) else: ax2.get_legend().remove() ax2.grid(axis='y') # title # if not front: # fig.suptitle(f"On-premises:Cloud {100 - c}%:{c}%", fontsize=font_size + 2) # Save the figure plt.savefig(f'overall_comparison_c_{c}{suffix}.pdf') plt.close() print(f"Saved overall_comparison_c_{c}{suffix}.pdf") def draw_job_routing(): def process(df: pd.DataFrame, tag: str): # header: period,mode,cloud_computation_ratio,cloud_computation_target, # ingress_byte_Presto,egress_byte_Presto,ingress_byte_Spark,egress_byte_Spark, # P90_traffic_bps,P95_traffic_bps,P99_traffic_bps, # movement_ingress_bytes,movement_egress_bytes,rep_bytes,sample_rate df['egress_volume'] = (df['egress_byte_Presto'] + df['egress_byte_Spark'] + df['movement_egress_bytes']) df['egress_cost'] = df['egress_volume'] / 1024 ** 3 * 0.02 df['traffic_volume'] = (df['ingress_byte_Presto'] + df['ingress_byte_Spark'] + df['egress_volume'] + df['movement_ingress_bytes']) df['tag'] = tag df = df[['tag', 'cloud_computation_target', 'traffic_volume', 'egress_cost', 'mode']] return df colors = ['darkorange', 'blue', 'dodgerblue', 'cyan'] medianprops = dict(linestyle='-', linewidth=2, color='gold') # header: mode,cloud_computation_target,traffic_volume,egress_cost df_moirai = process(pd.read_csv('../sample_1.000_rep0.002/log.csv'), "Moirai") df_yugong = process(pd.read_csv('../yugong_results_rep0.002/log.csv'), "Yugong") # fig(a): weekly traffic volume fig, axes = plt.subplots(1, 3, figsize=(12, 5.5), sharey=True) for idx, c in enumerate([30, 50, 70]): ax = axes[idx] df_c = pd.concat([df_moirai[df_moirai['cloud_computation_target'] == c], df_yugong[df_yugong['cloud_computation_target'] == c]]) box_data = [] box_data.append(df_c[df_c['tag'] == 'Yugong']['traffic_volume']) # Yugong for mode in ['independent', 'size-unaware','size-predict', 'size-aware']: box_data.append(df_c[(df_c['mode'] == mode) & (df_c['tag'] == 'Moirai')]['traffic_volume']) positions = [1, 2.2, 2.5, 2.9, 3.3] bp = ax.boxplot(box_data, patch_artist=True, positions=positions, widths=0.2, showfliers=False, showmeans=True, whis=[10, 90], medianprops=medianprops) # Set boxplot colors for patch, color in zip(bp['boxes'], colors): patch.set_facecolor(color) # Set title and labels ax.set_title(f"On-prem:Cloud\n{100 - c}%:{c}%", fontsize=font_size-2) ax.set_xlabel(None) ax.set_yscale('log', base=2) ax.set_xticks([1, 2.7]) ax.set_xticklabels(['Yugong', 'Moirai'], fontsize=font_size - 2) ax.set_yticks([2** 40 * 2 ** i for i in [6, 7, 8, 9, 10, 13, 15, 16, 17]]) ax.set_yticklabels([f"{2 ** i:.0f}TB" if i < 10 else f"{2 ** i / 1024:.0f}PB" for i in [6, 7, 8, 9, 10, 13, 15, 16, 17]], fontsize=font_size - 2) ax.axhline(y=11.5 * 1024**5, color='red', linestyle='--', linewidth=1.4) if c == 30: ax.text(0.06, 10.5 * 1024**5, '11.5PB', ha='center', fontsize=font_size - 2, color='red', rotation=0) ax.text(2.5, 13 * 1024**5, 'Network threshold', ha='center', fontsize=font_size - 6, color='red', rotation=0) ax.set_ylabel("Weekly Traffic Volume (log)") ax.tick_params(axis='x', labelsize=font_size - 2) ax.grid(axis='y') ax.text(2.2, np.percentile(box_data[1], 90) * 1.07, 'Indep', ha='center', fontsize=font_size - 6, color='black') ax.text(1.9, np.percentile(box_data[2], 90) * 0.6, 'Size\nUnaware', ha='center', fontsize=font_size - 6,) ax.text(2.95, np.percentile(box_data[3], 90) * 1.1, 'Size\nPredict', ha='center', fontsize=font_size - 6, color='black') ax.text(3.3, np.percentile(box_data[4], 40) * 0.3, 'Size\nOracular', ha='center', fontsize=font_size - 6, color='black') plt.tight_layout() plt.savefig('routing_traffic.pdf') plt.close() # fig(b): weekly egress cost fig, axes = plt.subplots(1, 3, figsize=(12, 5.5), sharey=True) for idx, c in enumerate([30, 50, 70]): ax = axes[idx] df_c = pd.concat([df_moirai[df_moirai['cloud_computation_target'] == c], df_yugong[df_yugong['cloud_computation_target'] == c]]) box_data = [] box_data.append(df_c[df_c['tag'] == 'Yugong']['egress_cost']) # Yugong for mode in ['independent', 'size-unaware', 'size-predict', 'size-aware']: box_data.append(df_c[(df_c['mode'] == mode) & (df_c['tag'] == 'Moirai')]['egress_cost']) positions = [1, 2.2, 2.5, 2.9, 3.3] bp = ax.boxplot(box_data, patch_artist=True, positions=positions, widths=0.2, showfliers=False, showmeans=True, whis=[10, 90], medianprops=medianprops) # Set boxplot colors for patch, color in zip(bp['boxes'], colors): patch.set_facecolor(color) # Set title and labels ax.set_title(f"On-prem:Cloud\n{100 - c}%:{c}%", fontsize=font_size - 2) ax.set_xlabel(None) ax.set_yscale('log', base=2) ax.set_xticks([1, 2.7]) ax.set_xticklabels(['Yugong', 'Moirai'], fontsize=font_size - 2) ax.set_yticks([200, 500, 2000, 5000, 50000, 500000, 5000000]) ax.set_yticklabels(["$200", "$500", "$2K", "$5K", "$50K", "$500K", "$5M"], fontsize=font_size - 2) if c == 30: ax.set_ylabel("Weekly Egress Cost (log)") ax.tick_params(axis='x', labelsize=font_size - 2) ax.grid(axis='y') ax.text(2.2, np.percentile(box_data[1], 90) * 1.07, 'Indep', ha='center', fontsize=font_size - 6, color='black') ax.text(1.9, np.percentile(box_data[2], 40), 'Size\nUnaware', ha='center', fontsize=font_size - 6, ) ax.text(2.95, np.percentile(box_data[3], 90) * 1.1, 'Size\nPredict', ha='center', fontsize=font_size - 6, color='black') ax.text(3.3, np.percentile(box_data[4], 40) * 0.3, 'Size\nOracular', ha='center', fontsize=font_size - 6, color='black') plt.tight_layout() plt.savefig('routing_cost.pdf') plt.close() def draw_traffic_rate(single=True): def traffic_rate_stats(df: pd.DataFrame, tag): # header: period,mode,cloud_computation_ratio,cloud_computation_target, # ingress_byte_Presto,egress_byte_Presto,ingress_byte_Spark,egress_byte_Spark, # P90_traffic_bps,P95_traffic_bps,P99_traffic_bps, # movement_ingress_bytes,movement_egress_bytes,rep_bytes,sample_rate df = df[df['mode'] == 'size-predict'].copy() df['traffic_bytes'] = (df['ingress_byte_Presto'] + df['ingress_byte_Spark'] + df['egress_byte_Presto'] + df['egress_byte_Spark'] + df['movement_ingress_bytes'] + df['movement_egress_bytes']) df['avg_traffic_bps'] = df['traffic_bytes'] * 8 / 7 / 24 / 3600 # Extract start date from period (YYYYMMDD format) df['start_date'] = df['period'].str[:8] # Extract YYYYMMDD df['start_date'] = pd.to_datetime(df['start_date'], format='%Y%m%d') # Convert to datetime # Calculate week_id based on 2024-10-22 as the reference date reference_date = datetime(2024, 10, 22) df['week_id'] = ((df['start_date'] - reference_date).dt.days // 7 + 1).astype(int) # Compute week index df['tag'] = tag df.rename(columns={'P90_traffic_bps': 'P90', 'P95_traffic_bps': 'P95', 'P99_traffic_bps': 'P99'}, inplace=True) df['P90'] = df['P90'] / 1024 ** 3 df['P95'] = df['P95'] / 1024 ** 3 df['P99'] = df['P99'] / 1024 ** 3 df['avg_traffic_bps'] = df['avg_traffic_bps'] / 1024 ** 3 return df[['tag', 'week_id', 'cloud_computation_target', 'avg_traffic_bps', 'P90', 'P95', 'P99']] colors = { "Yugong": colors_default[0], "Moirai": colors_default[1] } markers = { 'Yugong': 's', 'Moirai': '*' } todo_dfs = [] #for rate in [0.001, 0.002, 0.004]: for rate in [0.002]: df = pd.read_csv(f'../sample_1.000_rep{rate:.3f}/log.csv') todo_dfs.append(traffic_rate_stats(df, f"Moirai")) for rep_rate in [0.002]: df = pd.read_csv(f'../yugong_results_rep{rep_rate:.3f}/log.csv') todo_dfs.append(traffic_rate_stats(df, f"Yugong")) # Concatenate all processed data # header: period,cloud_computation_target,avg_traffic_bps,P90_traffic_bps,P95_traffic_bps,P99_traffic_bps df = pd.concat(todo_dfs) df.to_csv('traffic_rate_stats.csv', index=False) for metric in ['P90', 'P95', 'P99']: if metric == 'P90': ylim = 600 elif metric == 'P95': ylim = 900 else: ylim = 1500 if single: fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(10, 5)) c_list = [50] else: # Create a figure with 3 subplots (1 row, 3 columns) fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(22, 7)) c_list = [30, 50, 70] for idx, c in enumerate(c_list): df_c = df[df['cloud_computation_target'] == c] if single: ax = axes else: ax = axes[idx] for tag in ['Yugong', 'Moirai']: sub_df = df_c[df_c['tag'] == tag].copy() sub_df.sort_values(by='week_id', inplace=True) # Plot the PXX traffic and the average traffic for comparison ax.plot(sub_df['week_id'], sub_df['avg_traffic_bps'], linestyle='--', label=f'{tag} Avg', color=colors[tag], marker=markers[tag], markersize=8) ax.plot(sub_df['week_id'], sub_df[metric], linestyle='-', label=f'{tag} {metric}', color=colors[tag], marker=markers[tag], markersize=8) # Set title and labels if not single: ax.set_title(f"On-prem:Cloud={100 - c}%:{c}%") ax.set_xlabel('Week') ax.axhline(y=160, color='red', linestyle='--', linewidth=2) if idx == 0: ax.set_ylabel('Traffic Rate (Gbps)') ax.legend(fontsize=font_size - 3, ncol=2) ax.text(0.65, 120, '160', color='red', ha='center', fontsize=20) ax.text(5, 110, 'Network threshold', color='red', ha='center', fontsize=20) else: ax.set_ylabel(None) ax.set_yticklabels([]) ax.set_ylim(0, ylim) ax.grid(axis='y') # Create a secondary y-axis for cost ax2 = ax.twinx() ax2.set_ylim(0, ylim / 100 * 7 * 24 * 23.3) for tag in ['Yugong', 'Moirai']: sub_df = df_c[df_c['tag'] == tag].copy() sub_df.sort_values(by='week_id', inplace=True) ax2.plot(sub_df['week_id'], sub_df[metric] / 100 * 7 * 24 * 23.3, linestyle='-', label=f'{tag} Cost', color=colors[tag], marker=markers[tag], markersize=8) if idx == len(c_list)-1: yticks = ax2.get_yticks() ytick_labels = [f"{int(i/1000)}K" for i in yticks] ax2.set_yticklabels(ytick_labels) ax2.set_ylabel('Weekly Network Cost ($)') else: ax2.set_yticklabels([]) # Save the figure plt.tight_layout() plt.savefig(f'traffic_rate_{metric}.pdf', bbox_inches='tight') plt.close() print(f"Saved traffic_rate_{metric}.pdf") def replication_effects(): # header: abstractFingerPrint,db_name,table_name,inputDataSize,cputime,outputDataSize # presto_df = pd.read_csv('../newTraces/report-abFP-volume-table-20241022-20241028-Presto.csv') # presto_df['db_table'] = presto_df['db_name'] + '.' + presto_df['table_name'] # spark_df = pd.read_csv('../newTraces/report-abFP-volume-table-20241022-20241028-Spark.csv') # spark_df['db_table'] = spark_df['db_name'] + '.' + spark_df['table_name'] presto_df = pd.read_csv('../newTraces/report-abFP-volume-table-20250114-20250120-Presto.csv') presto_df['db_table'] = presto_df['db_name'] + '.' + presto_df['table_name'] spark_df = pd.read_csv('../newTraces/report-abFP-volume-table-20250114-20250120-Spark.csv') spark_df['db_table'] = spark_df['db_name'] + '.' + spark_df['table_name'] for rep_rate in [0.02, 0.002]: print(f'Rep: {rep_rate}') for strategy in [ # 'read_traffic_volume','inverse_dataset_size', # 'job_access_frequency', # 'read_traffic_density', 'job_access_density' ]: path = f"../sample_1.000_rep{rep_rate:.3f}_strategies/replicated_tables_{str(rep_rate)}_{strategy}.csv" if not os.path.exists(path): continue rep_list = pd.read_csv(path)[f'replicated_tables'].to_list() effective_presto_df = presto_df[~presto_df['db_table'].isin(rep_list)] effective_spark_df = spark_df[~spark_df['db_table'].isin(rep_list)] print(f"Strategy: {strategy}") #print("Presto # of edges, all:", len(presto_df), "effective:", len(presto_df) - len(reduced_presto_df)) #print("Spark # of edges, all:", len(spark_df), "effective:", len(spark_df) - len(reduced_spark_df)) print("# effective edges", len(effective_spark_df) + len(effective_presto_df)) print("# of effective jobs", effective_spark_df['abstractFingerPrint'].nunique() + effective_presto_df['abstractFingerPrint'].nunique()) print("# of unique db_tables", pd.concat([effective_spark_df, effective_presto_df])['db_table'].nunique()) for rep_rate in [0.001, 0.002, 0.004]: rep_list = pd.read_csv(f"../sample_1.000_rep{rep_rate:.3f}/replicated_tables.csv")['replicated_tables'].to_list() reduced_presto_df = presto_df[presto_df['db_table'].isin(rep_list)] reduced_spark_df = spark_df[spark_df['db_table'].isin(rep_list)] print(f"Replication rate: {rep_rate:.3f}") print("Presto # of edges, all:", len(presto_df), "affected:", len(reduced_presto_df)) print("Spark # of edges, all:", len(spark_df), "affected:", len(reduced_spark_df)) # header: abstractFingerPrint,db_name,table_name,inputDataSize,outputDataSize,cputime yugong_df = pd.read_csv('../yugongTraces/report-uown-volume-table-20241022-20241028.csv') yugong_df['db_table'] = yugong_df['db_name'] + '.' + yugong_df['table_name'] for rep_rate in [0.004]: rep_list = pd.read_csv(f"../yugong_results_rep{rep_rate:.3f}/replicated_tables_0.004.csv")['replicated_tables'].to_list() reduced_yugong_df = yugong_df[yugong_df['db_table'].isin(rep_list)] print(f"Replication rate: {rep_rate:.3f}") print("Yugong # of edges, all:", len(yugong_df), "affected:", len(reduced_yugong_df)) def verify_traffic_rate(yugong: bool = False): traffic_rate = 0 start_date = datetime(year=2024, month=10, day=29) + timedelta(days=7*8) print("Start date:", start_date) for date in date_range(start=start_date, end=start_date + timedelta(days=6), freq='D'): if yugong: df = pd.read_csv(f'../yugong_results_rep0.002/c30/traffic_{date.strftime("%Y%m%d")}.csv') else: df = pd.read_csv(f'../sample_1.000_rep0.001/c30/traffic_{date.strftime("%Y%m%d")}.csv') df['traffic_rate'] = df['egress_rate_presto_bps'] + df['egress_rate_spark_bps'] + \ df['ingress_rate_presto_bps'] + df['ingress_rate_spark_bps'] traffic_rate += df['traffic_rate'].sum() weekly_traffic = traffic_rate / 8 * 60 print("Weekly traffic:", human_readable_size(weekly_traffic)) all_traffic_rates = [] for single_date in pd.date_range(start=start_date, end=start_date + timedelta(days=6), freq='D'): if yugong: traffic_file = os.path.join("../yugong_results_rep0.002/c30", f"traffic_{single_date.strftime('%Y%m%d')}.csv") else: traffic_file = os.path.join("../sample_1.000_rep0.001/c30", f"traffic_{single_date.strftime('%Y%m%d')}.csv") if os.path.exists(traffic_file): df = pd.read_csv(traffic_file) df['egress_rate_bps'] = df['egress_rate_presto_bps'] + df['egress_rate_spark_bps'] df['ingress_rate_bps'] = df['ingress_rate_presto_bps'] + df['ingress_rate_spark_bps'] df['traffic_rate'] = df['egress_rate_bps'] + df['ingress_rate_bps'] all_traffic_rates.extend(df["traffic_rate"].tolist()) # all_traffic_rates.extend(df["egress_rate_bps"].tolist()) # all_traffic_rates.extend(df["ingress_rate_bps"].tolist()) else: print(f"Traffic file not found: {traffic_file}") print("# traffic rates:", len(all_traffic_rates)) print("P90", int(np.percentile(all_traffic_rates, 90)), "P95", int(np.percentile(all_traffic_rates, 95)), "P99", int(np.percentile(all_traffic_rates, 99)),) # # Convert to NumPy array and sort for CDF # all_traffic_rates = np.array(all_traffic_rates) # sorted_data = np.sort(all_traffic_rates) # cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data) # # # Plot CDF # plt.figure(figsize=(8, 6)) # plt.plot(sorted_data, cdf, marker='.', linestyle='none') # plt.xlabel('Traffic Rate (bps)') # plt.ylabel('CDF') # plt.title('CDF of Traffic Rates') # plt.grid(True) # plt.xscale('log') # plt.savefig('traffic_rate_cdf.png') def plot_weekly_traffic(week_id: int, yugong: bool = False, c: int = 30): # Compute start date of the given week (week_id=2 starts on 2024-10-22) base_date = datetime(2024, 10, 29) start_date = base_date + timedelta(days=7 * (week_id - 2)) print("Start date:", start_date) traffic_rates = [] # Iterate through the 7 days of the given week for i in range(7): current_date = start_date + timedelta(days=i) file_date_str = current_date.strftime('%Y%m%d') # Construct file path if yugong: traffic_file = f"../yugong_results_rep0.002/c{c}/traffic_{file_date_str}.csv" else: traffic_file = f"../sample_1.000_rep0.001/c{c}/traffic_{file_date_str}.csv" # Read traffic data if the file exists if os.path.exists(traffic_file): df = pd.read_csv(traffic_file) df['traffic_rate'] = (df['egress_rate_presto_bps'] + df['egress_rate_spark_bps'] + df['ingress_rate_presto_bps'] + df['ingress_rate_spark_bps']) traffic_rates.extend(df['traffic_rate'].tolist()) else: print(f"Traffic file not found: {traffic_file}") # Plot the traffic rate over the week plt.figure(figsize=(10, 5)) plt.plot(traffic_rates, linestyle='-', color='blue') plt.xlabel("Minute bucket") plt.ylabel("Traffic Rate (bps)") plt.ylim(0, 2*10**12) plt.title(f"Traffic Rate Over Week {week_id}") plt.grid(True) plt.tight_layout() plt.savefig(f'traffic_week_{week_id}.png') def draw_growth(format="png"): def compute_manual_linear_slope(x, y): """Computes the slope of the best-fit line using least squares regression.""" n = len(x) mean_x, mean_y = np.mean(x), np.mean(y) numerator = np.sum((x - mean_x) * (y - mean_y)) denominator = np.sum((x - mean_x) ** 2) slope = numerator / denominator if denominator != 0 else 0 return slope # Function to compute and plot linear fit manually def plot_manual_linear_fit(x, y, label, ax, color): """Computes and plots a manual least squares linear fit.""" # Compute slope and intercept manually slope = compute_manual_linear_slope(x, y) intercept = np.mean(y) - slope * np.mean(x) # Generate fitted line linear_fit = slope * x + intercept ax.plot(x, linear_fit, linestyle="-", label=label, color=color) print(label, "slope:", slope) # Function to format tick labels def format_ticks(value, _): if value >= 1e12: return f"{value / 1e12:.1f}T" elif value >= 1e9: return f"{value / 1e9:.1f}B" elif value >= 1e6: return f"{value / 1e6:.1f}M" elif value >= 1e3: return f"{value / 1e3:.0f}K" else: return str(int(value)) # Load Presto and Spark data presto_df = pd.read_csv("../metrics_per_day_presto.csv", parse_dates=['date']) spark_df = pd.read_csv("../metrics_per_day_spark.csv", parse_dates=['date']) # # Compute totals # presto_total_jobs = presto_df['daily_jobs'].sum() # spark_total_jobs = spark_df['daily_jobs'].sum() # # presto_total_rw_bytes = presto_df['daily_read_volume'].sum() # spark_total_rw_bytes = spark_df['daily_read_volume'].sum() + spark_df['daily_write_volume'].sum() # # print("=== Totals Over the Period ===") # print(f"Presto: {presto_total_jobs:,} jobs, {presto_total_rw_bytes / 1024 ** 5:.2f} PB read") # print(f"Spark: {spark_total_jobs:,} jobs, {spark_total_rw_bytes / 1024 ** 5:.2f} PB read+write") # Sort by date presto_df.sort_values('date', inplace=True) spark_df.sort_values('date', inplace=True) # Compute days elapsed since the first date min_date = min(presto_df['date'].min(), spark_df['date'].min()) presto_df['days_elapsed'] = (presto_df['date'] - min_date).dt.days spark_df['days_elapsed'] = (spark_df['date'] - min_date).dt.days # Plot 1: Number of daily jobs plt.figure(figsize=(10, 4)) plt.tick_params(axis='both', labelsize=font_size - 2) plt.plot(presto_df['days_elapsed'], presto_df['daily_jobs'], label="Presto", color='blue', linestyle='--') plot_manual_linear_fit(presto_df['days_elapsed'].values, presto_df['daily_jobs'].values, "Trend line", plt.gca(), 'blue') plt.plot(spark_df['days_elapsed'], spark_df['daily_jobs'], label="Spark", color='orange', linestyle='--') plot_manual_linear_fit(spark_df['days_elapsed'].values, spark_df['daily_jobs'].values, "Trend line", plt.gca(), 'orange') plt.text(50, 280*1000, "30% annual increase", fontsize=font_size - 2, color='red') plt.annotate('', xy=(58, 395 * 1000), xytext=(55, 310 * 1000), arrowprops=dict(arrowstyle="->", color='red', lw=2)) plt.annotate('', xy=(60, 240 * 1000), xytext=(55, 275 * 1000), arrowprops=dict(arrowstyle="->", color='red', lw=2)) plt.xlabel("Day", fontsize=font_size) plt.ylabel("# of Daily Jobs", fontsize=font_size) plt.legend(fontsize=font_size - 3, ncol=4, bbox_to_anchor=(0.5, 1.2), loc='upper center') plt.gca().yaxis.set_major_formatter(FuncFormatter(format_ticks)) plt.ylim(bottom=0, top=600 * 1000) plt.xlim(0, 110) plt.grid() plt.tight_layout() plt.subplots_adjust(top=0.8) # Adjust to leave space for the legend above plt.savefig(f"daily_jobs.{format}", bbox_inches='tight') # Plot 2: Number of daily templates plt.figure(figsize=(10, 4)) plt.tick_params(axis='both', labelsize=font_size - 2) plt.plot(presto_df['days_elapsed'], presto_df['daily_templates'], label="Presto", color='blue', linestyle='--') plt.plot(spark_df['days_elapsed'], spark_df['daily_templates'], label="Spark", color='orange', linestyle='--') plot_manual_linear_fit(presto_df['days_elapsed'].values, presto_df['daily_templates'].values, "Trend line", plt.gca(), 'blue') plot_manual_linear_fit(spark_df['days_elapsed'].values, spark_df['daily_templates'].values, "Trend line", plt.gca(), 'orange') plt.text(50, 50*1000, "20% annual increase", fontsize=font_size - 2, color='red') plt.annotate('', xy=(58, 105 * 1000), xytext=(55, 60 * 1000), arrowprops=dict(arrowstyle="->", color='red', lw=2)) plt.annotate('', xy=(60, 35 * 1000), xytext=(55, 46 * 1000), arrowprops=dict(arrowstyle="->", color='red', lw=2)) plt.xlabel("Day", fontsize=font_size) plt.ylabel("# of Daily Templates", fontsize=font_size) plt.legend(fontsize=font_size - 3, ncol=4, bbox_to_anchor=(0.5, 1.2), loc='upper center') plt.gca().yaxis.set_major_formatter(FuncFormatter(format_ticks)) plt.ylim(bottom=0, top=150 * 1000) plt.xlim(0, 110) plt.grid() plt.tight_layout() plt.savefig(f"daily_templates.{format}", bbox_inches='tight') # Plot 3: Daily traffic volume (read/write) plt.figure(figsize=(10, 3.5)) plt.tick_params(axis='both', labelsize=font_size - 2) plt.plot(presto_df['days_elapsed'], presto_df['daily_read_volume'] / 1024 ** 5, label="Presto", color='blue', linestyle='--') plot_manual_linear_fit(presto_df['days_elapsed'].values, presto_df['daily_read_volume'].values / 1024 ** 5, "Trend line", plt.gca(), 'blue') plt.plot(spark_df['days_elapsed'], (spark_df['daily_read_volume'] + spark_df['daily_write_volume']) / 1024 ** 5, label="Spark", color='orange', linestyle='--') plot_manual_linear_fit(spark_df['days_elapsed'].values, (spark_df['daily_read_volume'] + spark_df['daily_write_volume']).values / 1024 ** 5, "Trend line", plt.gca(), 'orange') # plt.plot(spark_df['days_elapsed'], spark_df['daily_write_volume'] / 1024 ** 5, label="Spark Write", color='purple', linestyle='--') plt.text(50, 55, "30% annual increase", fontsize=font_size - 2, color='red') plt.annotate('', xy=(58, 105), xytext=(55, 65), arrowprops=dict(arrowstyle="->", color='red', lw=2)) plt.annotate('', xy=(60, 25), xytext=(55, 50), arrowprops=dict(arrowstyle="->", color='red', lw=2)) plt.xlabel("Day", fontsize=font_size) plt.ylabel("Daily Traffic (PB)", fontsize=font_size) #plt.legend(fontsize=font_size - 3, ncol=2) plt.gca().yaxis.set_major_formatter(FuncFormatter(format_ticks)) plt.ylim(bottom=0, top=150) plt.xlim(0, 110) plt.grid() plt.tight_layout() plt.savefig(f"daily_traffic_volume.{format}", bbox_inches='tight') def draw_PR_heuristics(double=True): # Define heuristics and data heuristics = [ "No\nRep", "Access Traffic\nVolume", "Inverse\nDataset Size", "Job Access\nFrequency", "Access Traffic\nDensity", "Job Access\nDensity (Moirai)" ] replication_times = [(93+254+94)/3, (120+181+15)/3, (20+111+11)/3, (11+87+34)/3, (55+119+48)/3, (2+3+1)/3] # Approximate from table (h) num_edges = [1252, 1102, 855, 763, 819, 509] # In K num_jobs = [356, 350, 307, 322, 330, 256] # In K num_tables = [134, 133, 113, 133, 133, 119] # In K x = np.arange(len(heuristics)) # the label locations if double: width = 0.25 fig, axes = plt.subplots(2, 1, figsize=(15, 6), sharex=True) ax1, ax2 = axes ax1.bar(x, replication_times, width, color='skyblue') ax1.set_ylabel("Optimization\nTime (hr)") ax1.set_ylim(0, 168) ax1.grid(axis='y') ax2.bar(x - width, num_edges, width, label="# Edges", color='salmon', hatch='/') ax2.bar(x, num_jobs, width, label="# Jobs", color='mediumseagreen', hatch='\\') ax2.bar(x + width, num_tables, width, label="# Tables", color='mediumpurple', hatch='|') ax2.set_ylabel("Count (K)") ax2.set_ylim(0, 1500) ax2.set_xticks(x) ax2.set_xticklabels(heuristics) ax2.grid(axis='y', linestyle='--', alpha=0.6) ax2.legend() else: width = 0.18 fig, ax1 = plt.subplots(figsize=(14, 4.5)) ax2 = ax1.twinx() # Adjust positions so all 4 bars are shown for each heuristic ax1.bar(x - 1.7 * width, replication_times, width, label="Optimization Time", color=colors_default[0]) ax1.legend(loc='upper left', fontsize=font_size + 3) for i, time in enumerate(replication_times): ax1.text( x[i] - 2.4 * width if (i == 0 or i == 1) else x[i] - 2 * width, time + 3, # position slightly above the bar f"{round(time)}hr", ha='center', va='bottom', fontsize=font_size + 1 ) ax2.bar(x - 0.4 * width, num_edges, width, label="# Edges", color=colors_default[1], hatch='/') ax2.bar(x + 0.6 * width, num_jobs, width, label="# Jobs", color=colors_default[2], hatch='\\') ax2.bar(x + 1.6 * width, num_tables, width, label="# Tables", color=colors_default[3], hatch='|') ax2.legend(loc='upper right', fontsize=font_size+3) ax1.set_ylabel("Optimization\nTime (hr)", fontsize=font_size + 2) ax1.set_ylim(0, 250) # ax1.grid(axis='y') yticks = ax1.get_yticks() ax1.set_yticks(yticks) ax1.set_yticklabels([int(tick) for tick in yticks], fontsize=font_size + 2) ax2.set_ylabel("Count (K)", fontsize=font_size + 2) ax2.set_ylim(0, 1500) ax1.set_xticks(x) ax1.set_xticklabels(heuristics, rotation=10) ax2.grid(axis='y', linestyle='--', alpha=0.6) ax2.set_yticklabels([int(x) for x in ax2.get_yticks()], fontsize=font_size + 2) # # Merge legends from both axes # bars, labels = [], [] # for ax in [ax1, ax2]: # h, l = ax.get_legend_handles_labels() # bars += h # labels += l # ax2.legend(bars, labels, loc='upper right', fontsize=font_size + 2) plt.tight_layout() plt.savefig(f"optimization_time_{double}.pdf", bbox_inches='tight') def draw_edges_cdf(): def calculate_percentiles(data, percentiles): if len(data) > 0: return np.percentile(data, percentiles) else: return [np.nan] * len(percentiles) def sample_cdf(data, num_points=1000): """Compute CDF and sample it at `num_points` evenly spaced intervals.""" if len(data) == 0: return np.array([]), np.array([]) sorted_data = np.sort(data) cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data) # Select `num_points` evenly spaced indices sample_indices = np.linspace(0, len(sorted_data) - 1, num_points, dtype=int) return sorted_data[sample_indices], cdf[sample_indices] def calculate_cdf(data): """Calculate CDF from data.""" sorted_data = np.sort(data) cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data) return sorted_data, cdf cdf_cache_file = "cdf_results.csv" if os.path.exists(cdf_cache_file): print("Loading cached CDF results...") cdf_results = pd.read_csv(cdf_cache_file) presto_x = cdf_results['presto_x'].dropna().values presto_cdf = cdf_results['presto_cdf'].dropna().values spark_x = cdf_results['spark_x'].dropna().values spark_cdf = cdf_results['spark_cdf'].dropna().values table_presto_x = cdf_results['table_presto_x'].dropna().values table_presto_cdf = cdf_results['table_presto_cdf'].dropna().values table_spark_x = cdf_results['table_spark_x'].dropna().values table_spark_cdf = cdf_results['table_spark_cdf'].dropna().values else: print("Computing CDFs...") start_date = datetime(2024, 10, 22) end_date = datetime(2024, 10, 28) job_presto_counts = [] job_spark_counts = [] for date in date_range(start=start_date, end=end_date, freq='D'): print("Processing", date.strftime("%Y-%m-%d")) presto_path = f"../jobTraces/{date.strftime('%Y%m%d')}-Presto.csv" spark_path = f"../jobTraces/{date.strftime('%Y%m%d')}-Spark.csv" if os.path.exists(presto_path): presto_df = pd.read_csv(presto_path) job_presto_counts.extend(presto_df.groupby('job_id')[['db_name', 'table_name']].nunique().sum(axis=1)) else: print(f"Missing file: {presto_path}") if os.path.exists(spark_path): spark_df = pd.read_csv(spark_path) job_spark_counts.extend(spark_df.groupby('job_id')[['db_name', 'table_name']].nunique().sum(axis=1)) else: print(f"Missing file: {spark_path}") table_presto_counts = [] table_spark_counts = [] presto_path = f"../newTraces/report-abFP-volume-table-{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')}-Presto.csv" spark_path = f"../newTraces/report-abFP-volume-table-{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')}-Spark.csv" if os.path.exists(presto_path): presto_df = pd.read_csv(presto_path) table_presto_counts.extend( presto_df.groupby('abstractFingerPrint')[['db_name', 'table_name']].nunique().sum(axis=1)) else: print(f"Missing file: {presto_path}") if os.path.exists(spark_path): spark_df = pd.read_csv(spark_path) table_spark_counts.extend( spark_df.groupby('abstractFingerPrint')[['db_name', 'table_name']].nunique().sum(axis=1)) else: print(f"Missing file: {spark_path}") # Compute CDFs presto_x, presto_cdf = sample_cdf(job_presto_counts) spark_x, spark_cdf = sample_cdf(job_spark_counts) table_presto_x, table_presto_cdf = sample_cdf(table_presto_counts) table_spark_x, table_spark_cdf = sample_cdf(table_spark_counts) # Define percentiles to compute (P10 to P100 in steps of 5) percentiles = np.arange(10, 101, 5) # Data (assumed loaded from the script) distributions = { "Presto Job": presto_x, "Spark Job": spark_x, "Presto Template": table_presto_x, "Spark Template": table_spark_x } # Compute percentiles percentile_results = { dist_name: calculate_percentiles(data, percentiles) for dist_name, data in distributions.items() } # Convert to DataFrame for display percentile_df = pd.DataFrame(percentile_results, index=[f"P{p}" for p in percentiles]) percentile_df.to_csv("cdf_percentiles.csv", index=True) # # Save CDF results # cdf_df = pd.DataFrame({ # 'presto_x': np.pad(presto_x, (0, max(0, len(table_presto_x) - len(presto_x))), 'constant', constant_values=np.nan), # 'presto_cdf': np.pad(presto_cdf, (0, max(0, len(table_presto_cdf) - len(presto_cdf))), 'constant', constant_values=np.nan), # 'spark_x': np.pad(spark_x, (0, max(0, len(table_spark_x) - len(spark_x))), 'constant', constant_values=np.nan), # 'spark_cdf': np.pad(spark_cdf, (0, max(0, len(table_spark_cdf) - len(spark_cdf))), 'constant', constant_values=np.nan), # 'table_presto_x': table_presto_x, # 'table_presto_cdf': table_presto_cdf, # 'table_spark_x': table_spark_x, # 'table_spark_cdf': table_spark_cdf # }) # cdf_df.to_csv(cdf_cache_file, index=False) # print("CDF results saved.") # Calculate mean values mean_presto_jobs = np.mean(presto_x) if len(presto_x) > 0 else 0 mean_spark_jobs = np.mean(spark_x) if len(spark_x) > 0 else 0 mean_presto_tables = np.mean(table_presto_x) if len(table_presto_x) > 0 else 0 mean_spark_tables = np.mean(table_spark_x) if len(table_spark_x) > 0 else 0 print(f"Mean # of Tables per Presto Job: {mean_presto_jobs:.2f}") print(f"Mean # of Tables per Spark Job: {mean_spark_jobs:.2f}") print(f"Mean # of Tables per Presto Template: {mean_presto_tables:.2f}") print(f"Mean # of Tables per Spark Template: {mean_spark_tables:.2f}") # Plot job-level CDF plt.figure(figsize=(8, 5)) plt.plot(presto_x, presto_cdf, label="Presto", linestyle='-', marker='.') plt.plot(spark_x, spark_cdf, label="Spark", linestyle='-', marker='.') plt.xscale("log") # Set x-axis to log scale plt.xlabel("# of Tables per Job") plt.ylabel("Fraction of jobs (CDF)") plt.legend() plt.grid() plt.tight_layout() plt.savefig("degree_cdf_job.pdf") # Plot table-level CDF plt.figure(figsize=(8, 5)) plt.plot(table_presto_x, table_presto_cdf, label="Presto", linestyle='-', marker='.') plt.plot(table_spark_x, table_spark_cdf, label="Spark", linestyle='-', marker='.') plt.xscale("log") # Set x-axis to log scale plt.xlabel("# of Tables per Template") plt.ylabel("Fraction of jobs (CDF)") plt.legend() plt.grid() plt.tight_layout() plt.savefig("degree_cdf_template.pdf") def draw_reorg(): dfs = { "Redistribution cost unaware": pd.read_csv("../long_term/placement_unaware.csv"), "Moirai": pd.read_csv("../long_term/placement_moirai.csv"), } fig, ax = plt.subplots(figsize=(11, 5)) # Define line styles and markers for black-and-white friendly plotting line_styles = ['--', '--', '-', '-'] # Placeholder for cumulative data cumulative_data = { "Redistribution cost unaware (Ingress)": [], "Redistribution cost unaware (Egress)": [], "Moirai (Ingress)": [], "Moirai (Egress)": [] } for label, df in dfs.items(): # Convert Ingress and Egress to bytes df['Ingress_Bytes'] = df['Ingress'].apply(parse_size).cumsum() df['Egress_Bytes'] = df['Egress'].apply(parse_size).cumsum() cumulative_data[f"{label} (Ingress)"] = df['Ingress_Bytes'] cumulative_data[f"{label} (Egress)"] = df['Egress_Bytes'] # Plot the Ingress and Egress data ax.plot(df['On-premises'], df['Ingress_Bytes'], label=f'{label} (ingress)', linestyle=line_styles.pop(0), color='blue', linewidth=2, marker='x', markersize=12) ax.plot(df['On-premises'], df['Egress_Bytes'], label=f'{label} (egress)', linestyle=line_styles.pop(0), color='green', linewidth=2, marker='*', markersize=12) if label == "Redistribution cost unaware": best_case_x = df['On-premises'] best_case_y = (1 - df['On-premises'] / 100) * (299.12 * 1024 ** 5) ax.plot(best_case_x, best_case_y, 'r-', label='Best case (ingress)', linewidth=6) # Customize the plot ax.set_xlabel('On-premises Storage Space / Total Data Size (%)', fontsize=font_size) ax.set_ylabel('Traffic Volume (PB)', fontsize=font_size) ax.tick_params(axis='x', labelsize=font_size - 2) ax.tick_params(axis='y', labelsize=font_size - 2) ax.grid(True) # Add legend ax.legend(loc='upper center', bbox_to_anchor=(0.5, 1.35), fontsize=font_size - 2, ncol=2, frameon=False) # Set y-axis ticks and labels ticks = [x for x in range(0, 55, 5)] yticks = [i * 10 * 1024 ** 5 for i in ticks] ytick_labels = ["0"] + [f"{i * 10}" if i % 2 == 0 else "" for i in ticks[1:]] ax.set_ylim(ymax=450 * 1024 ** 5) ax.set_yticks(yticks) ax.set_yticklabels(ytick_labels, fontsize=font_size - 2) ax.set_xlim(0, 90) ax.set_xticks([90, 80, 70, 60, 50, 40, 30, 20, 10, 0]) ax.set_xticklabels(["90%", "80%", "70%", "60%", "50%", "40%", "30%", "20%", "10%", "0%"], fontsize=font_size - 2) # Invert x-axis to start from large to low ax.invert_xaxis() plt.tight_layout() plt.savefig('migration.pdf', bbox_inches='tight') if __name__ == '__main__': replication_effects() draw_traffic_rate(single=False) draw_job_routing() draw_growth(format="pdf") draw_overall_new(front=True) draw_overall_new(job=True) draw_overall_new() draw_overall_new(pr=True) draw_PR_heuristics(double=False) draw_edges_cdf() draw_reorg() """ was not used in submission, for debugging """ # verify_traffic_rate(yugong=True) # for week_id in range(9, 14): # plot_weekly_traffic(week_id=week_id, yugong=True) # pass