in k8s/bench_toolbox.py [0:0]
def results(data_path, data_device, scale_factor, output_path):
df_result = json.loads(
open(
newest_file(glob.glob(os.path.join(output_path, "datafusion-ray*json")))
).read()
)
spark_result = json.loads(
open(newest_file(glob.glob(os.path.join(output_path, "spark-tpch*json")))).read()
)
print(df_result)
print(spark_result)
total_results = {"spark": spark_result, "df-ray": df_result}
spark = [spark_result["queries"][f"{i}"] for i in range(1, 23)]
df_ray = [df_result["queries"][f"{i}"] for i in range(1, 23)]
# add a final row with the totals
spark += [sum(spark)]
df_ray += [sum(df_ray)]
# add another final row with costs
# df for "dataframe" here, not "datafusion". Just using pandas for easy output
df = pd.DataFrame({"spark": spark, "df_ray": df_ray})
df["change"] = df["df_ray"] / df["spark"]
df["change_text"] = df["change"].apply(
lambda change: (
f"+{(1 / change):.2f}x faster" if change < 1.0 else f" {change:.2f}x slower"
)
)
df["tpch_query"] = [f"{i}" for i in range(1, 23)] + ["total"]
df["sort_index"] = list(range(1, 24))
df["spark"] = df["spark"].apply(lambda s: f"{s:>10.4f}")
df["df_ray"] = df["df_ray"].apply(lambda s: f"{s:>10.4f}")
df["change"] = df["change"].apply(lambda s: f"{s:>21.4f}")
df["change_text"] = df["change_text"].apply(lambda s: f"{s:>14}")
ts = time.time()
df.to_parquet(f"datafusion-ray-spark-comparison-{ts}.parquet")
ctx = datafusion.SessionContext()
ctx.register_parquet("results", f"datafusion-ray-spark-comparison-{ts}.parquet")
cpu = subprocess.run(
"lscpu | grep 'Model name' |awk '{print $3}'",
shell=True,
capture_output=True,
text=True,
).stdout.strip()
quantity = subprocess.run(
"lscpu | grep '^CPU(s):' |awk '{print $2}'",
shell=True,
capture_output=True,
text=True,
).stdout.strip()
memory = subprocess.run(
"lsmem | grep 'Total online' |awk '{print $4}'",
shell=True,
capture_output=True,
text=True,
).stdout.strip()
hdresults = subprocess.run(
f"sudo hdparm -t {data_device}|grep 'MB/sec'",
shell=True,
capture_output=True,
text=True,
).stdout.strip()
hdresult = re.search(r"([\d\.]+) MB/sec", hdresults, re.MULTILINE).group(1)
machine = ec2_metadata.ec2_metadata.instance_type
# if you get reserved it includes any discounts you may have associated
# with your ec2 credentials. So a public price is appropriate for sharing
hourly_cost = pricing.get_on_demand_price("us-east-1", machine)
spark_cost = spark[-1] / 3600 * hourly_cost
df_ray_cost = df_ray[-1] / 3600 * hourly_cost
cost_delta = df_ray_cost / spark_cost
cost_delta_text = (
f"+{(1 / cost_delta):.2f}x cheaper"
if cost_delta < 1.0
else f"{cost_delta:.2f}x more expensive"
)
speed_delta_text = df["change_text"].iloc[-1]
df_ray_cost = f"${df_ray_cost:.4f}"
df_ray_duration = f"{df_ray[-1]:.2f}s"
spark_cost = f"${spark_cost:.4f}"
spark_duration = f"{spark[-1]:.2f}s"
print("=" * 81)
# the formatting code is terrible here, but it works for now
header = [
"Spark and DataFusionRay TPCH 100 Benchmarks",
f"{'Machine:':<30}{machine}",
f"{'Machine On Demand Cost:':<30}{hourly_cost} $/hr",
f"{'CPU(s):':<30}{cpu} {quantity}x",
f"{'MEM:':<30}{memory}",
f"{'HD Throughput:':<30}{hdresult} MB/s (from hdparm)",
f"{'Data Location:':<30}{data_path}/sf{scale_factor}",
"",
f"{'df-ray duration:':<30}{df_ray_duration:>10} {speed_delta_text}",
f"{'df-ray cost:':<30}{df_ray_cost:>10} {cost_delta_text}",
"",
f"{'spark duration:':<30}{spark_duration:>10}",
f"{'spark cost:':<30}{spark_cost:>10}",
"",
"DataFusionRay Settings:",
f"{'concurrency:':<30}{df_result['settings']['concurrency']:>10}",
f"{'batch_size :':<30}{df_result['settings']['batch_size']:>10}",
f"{'partitions_per_processor:':<30}{df_result['settings']['partitions_per_processor']:>10}",
f"{'Ray Workers:':<30}{spark_result['spark_conf']['spark.executor.instances']:>10}",
f"{'Ray Worker Mem (GB):':<30}{int(spark_result['spark_conf']['spark.executor.memory'][:-1]) + int(spark_result['spark_conf']['spark.executor.memoryOverhead'][:-1]):>10}",
f"{'Ray Worker CPU:':<30}{spark_result['spark_conf']['spark.executor.cores']:>10}",
f"{'Ray Head Mem (GB):':<30}{int(spark_result['spark_conf']['spark.driver.memory'][:-1]):>10}",
f"{'Ray Head CPU:':<30}{spark_result['spark_conf']['spark.driver.cores']:>10}",
"",
"Spark Settings:",
f"{'Executors:':<30}{spark_result['spark_conf']['spark.executor.instances']:>10}",
f"{'Executor Mem (GB):':<30}{int(spark_result['spark_conf']['spark.executor.memory'][:-1]):>10}",
f"{'Executor Overhead Mem (GB):':<30}{int(spark_result['spark_conf']['spark.executor.memoryOverhead'][:-1]):>10}",
f"{'Executor CPU:':<30}{spark_result['spark_conf']['spark.executor.cores']:>10}",
f"{'Driver Mem(GB):':<30}{int(spark_result['spark_conf']['spark.driver.memory'][:-1]):>10}",
f"{'Driver CPU:':<30}{spark_result['spark_conf']['spark.driver.cores']:>10}",
]
for h in header:
print(h)
print("=" * 81)
ctx.sql(
'select tpch_query, spark, df_ray, change as "change(=df_ray/spark)", change_text from results order by sort_index asc'
).show(num=100)
out_path = f"datafusion-ray-spark-comparison-{ts}.json"
open(out_path, "w").write(json.dumps(total_results))