in scripts/figs.py [0:0]
def draw_edges_cdf():
def calculate_percentiles(data, percentiles):
if len(data) > 0:
return np.percentile(data, percentiles)
else:
return [np.nan] * len(percentiles)
def sample_cdf(data, num_points=1000):
"""Compute CDF and sample it at `num_points` evenly spaced intervals."""
if len(data) == 0:
return np.array([]), np.array([])
sorted_data = np.sort(data)
cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
# Select `num_points` evenly spaced indices
sample_indices = np.linspace(0, len(sorted_data) - 1, num_points, dtype=int)
return sorted_data[sample_indices], cdf[sample_indices]
def calculate_cdf(data):
"""Calculate CDF from data."""
sorted_data = np.sort(data)
cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
return sorted_data, cdf
cdf_cache_file = "cdf_results.csv"
if os.path.exists(cdf_cache_file):
print("Loading cached CDF results...")
cdf_results = pd.read_csv(cdf_cache_file)
presto_x = cdf_results['presto_x'].dropna().values
presto_cdf = cdf_results['presto_cdf'].dropna().values
spark_x = cdf_results['spark_x'].dropna().values
spark_cdf = cdf_results['spark_cdf'].dropna().values
table_presto_x = cdf_results['table_presto_x'].dropna().values
table_presto_cdf = cdf_results['table_presto_cdf'].dropna().values
table_spark_x = cdf_results['table_spark_x'].dropna().values
table_spark_cdf = cdf_results['table_spark_cdf'].dropna().values
else:
print("Computing CDFs...")
start_date = datetime(2024, 10, 22)
end_date = datetime(2024, 10, 28)
job_presto_counts = []
job_spark_counts = []
for date in date_range(start=start_date, end=end_date, freq='D'):
print("Processing", date.strftime("%Y-%m-%d"))
presto_path = f"../jobTraces/{date.strftime('%Y%m%d')}-Presto.csv"
spark_path = f"../jobTraces/{date.strftime('%Y%m%d')}-Spark.csv"
if os.path.exists(presto_path):
presto_df = pd.read_csv(presto_path)
job_presto_counts.extend(presto_df.groupby('job_id')[['db_name', 'table_name']].nunique().sum(axis=1))
else:
print(f"Missing file: {presto_path}")
if os.path.exists(spark_path):
spark_df = pd.read_csv(spark_path)
job_spark_counts.extend(spark_df.groupby('job_id')[['db_name', 'table_name']].nunique().sum(axis=1))
else:
print(f"Missing file: {spark_path}")
table_presto_counts = []
table_spark_counts = []
presto_path = f"../newTraces/report-abFP-volume-table-{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')}-Presto.csv"
spark_path = f"../newTraces/report-abFP-volume-table-{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')}-Spark.csv"
if os.path.exists(presto_path):
presto_df = pd.read_csv(presto_path)
table_presto_counts.extend(
presto_df.groupby('abstractFingerPrint')[['db_name', 'table_name']].nunique().sum(axis=1))
else:
print(f"Missing file: {presto_path}")
if os.path.exists(spark_path):
spark_df = pd.read_csv(spark_path)
table_spark_counts.extend(
spark_df.groupby('abstractFingerPrint')[['db_name', 'table_name']].nunique().sum(axis=1))
else:
print(f"Missing file: {spark_path}")
# Compute CDFs
presto_x, presto_cdf = sample_cdf(job_presto_counts)
spark_x, spark_cdf = sample_cdf(job_spark_counts)
table_presto_x, table_presto_cdf = sample_cdf(table_presto_counts)
table_spark_x, table_spark_cdf = sample_cdf(table_spark_counts)
# Define percentiles to compute (P10 to P100 in steps of 5)
percentiles = np.arange(10, 101, 5)
# Data (assumed loaded from the script)
distributions = {
"Presto Job": presto_x,
"Spark Job": spark_x,
"Presto Template": table_presto_x,
"Spark Template": table_spark_x
}
# Compute percentiles
percentile_results = {
dist_name: calculate_percentiles(data, percentiles)
for dist_name, data in distributions.items()
}
# Convert to DataFrame for display
percentile_df = pd.DataFrame(percentile_results, index=[f"P{p}" for p in percentiles])
percentile_df.to_csv("cdf_percentiles.csv", index=True)
# # Save CDF results
# cdf_df = pd.DataFrame({
# 'presto_x': np.pad(presto_x, (0, max(0, len(table_presto_x) - len(presto_x))), 'constant', constant_values=np.nan),
# 'presto_cdf': np.pad(presto_cdf, (0, max(0, len(table_presto_cdf) - len(presto_cdf))), 'constant', constant_values=np.nan),
# 'spark_x': np.pad(spark_x, (0, max(0, len(table_spark_x) - len(spark_x))), 'constant', constant_values=np.nan),
# 'spark_cdf': np.pad(spark_cdf, (0, max(0, len(table_spark_cdf) - len(spark_cdf))), 'constant', constant_values=np.nan),
# 'table_presto_x': table_presto_x,
# 'table_presto_cdf': table_presto_cdf,
# 'table_spark_x': table_spark_x,
# 'table_spark_cdf': table_spark_cdf
# })
# cdf_df.to_csv(cdf_cache_file, index=False)
# print("CDF results saved.")
# Calculate mean values
mean_presto_jobs = np.mean(presto_x) if len(presto_x) > 0 else 0
mean_spark_jobs = np.mean(spark_x) if len(spark_x) > 0 else 0
mean_presto_tables = np.mean(table_presto_x) if len(table_presto_x) > 0 else 0
mean_spark_tables = np.mean(table_spark_x) if len(table_spark_x) > 0 else 0
print(f"Mean # of Tables per Presto Job: {mean_presto_jobs:.2f}")
print(f"Mean # of Tables per Spark Job: {mean_spark_jobs:.2f}")
print(f"Mean # of Tables per Presto Template: {mean_presto_tables:.2f}")
print(f"Mean # of Tables per Spark Template: {mean_spark_tables:.2f}")
# Plot job-level CDF
plt.figure(figsize=(8, 5))
plt.plot(presto_x, presto_cdf, label="Presto", linestyle='-', marker='.')
plt.plot(spark_x, spark_cdf, label="Spark", linestyle='-', marker='.')
plt.xscale("log") # Set x-axis to log scale
plt.xlabel("# of Tables per Job")
plt.ylabel("Fraction of jobs (CDF)")
plt.legend()
plt.grid()
plt.tight_layout()
plt.savefig("degree_cdf_job.pdf")
# Plot table-level CDF
plt.figure(figsize=(8, 5))
plt.plot(table_presto_x, table_presto_cdf, label="Presto", linestyle='-', marker='.')
plt.plot(table_spark_x, table_spark_cdf, label="Spark", linestyle='-', marker='.')
plt.xscale("log") # Set x-axis to log scale
plt.xlabel("# of Tables per Template")
plt.ylabel("Fraction of jobs (CDF)")
plt.legend()
plt.grid()
plt.tight_layout()
plt.savefig("degree_cdf_template.pdf")