k8s/bench_toolbox.py (323 lines of code) (raw):

#!/usr/bin/env python3 import click import cmds import os import pandas as pd import glob import json import re import pricing import time import datafusion import ec2_metadata import subprocess from cmds import Runner runner: Runner | None = None @click.group() @click.option("--dry-run", is_flag=True) @click.option("-v", "--verbose", is_flag=True) def cli(dry_run: bool, verbose: bool): global runner runner = Runner(dry_run, verbose) @cli.command(help="run spark and df ray benchmarks") @click.option( "--executor-cpus", type=int, help="how much cpu to allocate to the executor[ray worker] nodes.", required=True, ) @click.option( "--executor-mem", type=int, help="how much memory (GiB) to allocate to the executor[ray worker] nodes.", required=True, ) @click.option( "--executor-overhead-mem", type=int, help="how much memory (GiB) to allocate to the executor overhead. Not used on ray. Will be subtracted from executor_mem", required=True, ) @click.option( "--executor-num", type=int, help="how many executors[ray workers] to start", required=True, ) @click.option( "--driver-mem", type=int, help="how much memory (GiB) to allocate to the driver[head] node.", required=True, ) @click.option( "--driver-cpus", type=int, help="how much cpu to allocate to the driver[ray head] node.", required=True, ) @click.option( "--scale-factor", type=click.Choice(["1", "10", "100", "1000"]), help="TPCH scale factor", required=True, ) @click.option( "--data-path", type=str, help="path(url) to the directory that holds generated TPCH data. Should be >= 300GB", required=True, ) @click.option( "--output-path", type=str, help="path to the local directory exposed via PVC", required=True, ) @click.option( "--concurrency", type=int, help="DFRay only. The number of target partitions to use in planning", required=True, ) @click.option( "--partitions-per-processor", type=int, help="how many partitions (out of [concurrency] value to host in each DFRayProcessor", required=True, ) @click.option( "--processor-pool-min", type=int, help="minimum number of DFRayProcessrs to allocate in a pool for use by queries", required=True, ) @click.option( "--df-ray-version", type=str, help="version number of DFRay to use", required=True ) @click.option( "--test-pypi", is_flag=True, help="use the test.pypi upload of DFRay", ) @click.option( "--arm", is_flag=True, help="deploy an arm image for ray cluster image", ) @click.argument( "system", type=click.Choice(["spark", "df_ray"]), ) def bench(system, **kwargs): assert runner is not None match system: case "spark": runner.run_commands(cmds.cmds["bench_spark"], kwargs) case "df_ray": runner.run_commands(cmds.cmds["bench_df_ray"], kwargs) case _: print(f"unknown system {system}") exit(1) @click.option( "--data-path", type=str, help="path/url to the directory that holds generated TPCH data. Should be >= 300GB", required=True, ) @click.option( "--output-path", type=str, help="path where outputfiles are written", required=True, ) @click.option( "--data-device", type=str, help="path to the device in /dev/ that holds the data-path. It will be benchmarked with hdparm for throughput.", required=True, ) @click.option( "--scale-factor", type=click.Choice(["1", "10", "100", "1000"]), help="TPCH scale factor", required=True, ) @cli.command(help="assemble the results into a single json") def results(data_path, data_device, scale_factor, output_path): df_result = json.loads( open( newest_file(glob.glob(os.path.join(output_path, "datafusion-ray*json"))) ).read() ) spark_result = json.loads( open(newest_file(glob.glob(os.path.join(output_path, "spark-tpch*json")))).read() ) print(df_result) print(spark_result) total_results = {"spark": spark_result, "df-ray": df_result} spark = [spark_result["queries"][f"{i}"] for i in range(1, 23)] df_ray = [df_result["queries"][f"{i}"] for i in range(1, 23)] # add a final row with the totals spark += [sum(spark)] df_ray += [sum(df_ray)] # add another final row with costs # df for "dataframe" here, not "datafusion". Just using pandas for easy output df = pd.DataFrame({"spark": spark, "df_ray": df_ray}) df["change"] = df["df_ray"] / df["spark"] df["change_text"] = df["change"].apply( lambda change: ( f"+{(1 / change):.2f}x faster" if change < 1.0 else f" {change:.2f}x slower" ) ) df["tpch_query"] = [f"{i}" for i in range(1, 23)] + ["total"] df["sort_index"] = list(range(1, 24)) df["spark"] = df["spark"].apply(lambda s: f"{s:>10.4f}") df["df_ray"] = df["df_ray"].apply(lambda s: f"{s:>10.4f}") df["change"] = df["change"].apply(lambda s: f"{s:>21.4f}") df["change_text"] = df["change_text"].apply(lambda s: f"{s:>14}") ts = time.time() df.to_parquet(f"datafusion-ray-spark-comparison-{ts}.parquet") ctx = datafusion.SessionContext() ctx.register_parquet("results", f"datafusion-ray-spark-comparison-{ts}.parquet") cpu = subprocess.run( "lscpu | grep 'Model name' |awk '{print $3}'", shell=True, capture_output=True, text=True, ).stdout.strip() quantity = subprocess.run( "lscpu | grep '^CPU(s):' |awk '{print $2}'", shell=True, capture_output=True, text=True, ).stdout.strip() memory = subprocess.run( "lsmem | grep 'Total online' |awk '{print $4}'", shell=True, capture_output=True, text=True, ).stdout.strip() hdresults = subprocess.run( f"sudo hdparm -t {data_device}|grep 'MB/sec'", shell=True, capture_output=True, text=True, ).stdout.strip() hdresult = re.search(r"([\d\.]+) MB/sec", hdresults, re.MULTILINE).group(1) machine = ec2_metadata.ec2_metadata.instance_type # if you get reserved it includes any discounts you may have associated # with your ec2 credentials. So a public price is appropriate for sharing hourly_cost = pricing.get_on_demand_price("us-east-1", machine) spark_cost = spark[-1] / 3600 * hourly_cost df_ray_cost = df_ray[-1] / 3600 * hourly_cost cost_delta = df_ray_cost / spark_cost cost_delta_text = ( f"+{(1 / cost_delta):.2f}x cheaper" if cost_delta < 1.0 else f"{cost_delta:.2f}x more expensive" ) speed_delta_text = df["change_text"].iloc[-1] df_ray_cost = f"${df_ray_cost:.4f}" df_ray_duration = f"{df_ray[-1]:.2f}s" spark_cost = f"${spark_cost:.4f}" spark_duration = f"{spark[-1]:.2f}s" print("=" * 81) # the formatting code is terrible here, but it works for now header = [ "Spark and DataFusionRay TPCH 100 Benchmarks", f"{'Machine:':<30}{machine}", f"{'Machine On Demand Cost:':<30}{hourly_cost} $/hr", f"{'CPU(s):':<30}{cpu} {quantity}x", f"{'MEM:':<30}{memory}", f"{'HD Throughput:':<30}{hdresult} MB/s (from hdparm)", f"{'Data Location:':<30}{data_path}/sf{scale_factor}", "", f"{'df-ray duration:':<30}{df_ray_duration:>10} {speed_delta_text}", f"{'df-ray cost:':<30}{df_ray_cost:>10} {cost_delta_text}", "", f"{'spark duration:':<30}{spark_duration:>10}", f"{'spark cost:':<30}{spark_cost:>10}", "", "DataFusionRay Settings:", f"{'concurrency:':<30}{df_result['settings']['concurrency']:>10}", f"{'batch_size :':<30}{df_result['settings']['batch_size']:>10}", f"{'partitions_per_processor:':<30}{df_result['settings']['partitions_per_processor']:>10}", f"{'Ray Workers:':<30}{spark_result['spark_conf']['spark.executor.instances']:>10}", f"{'Ray Worker Mem (GB):':<30}{int(spark_result['spark_conf']['spark.executor.memory'][:-1]) + int(spark_result['spark_conf']['spark.executor.memoryOverhead'][:-1]):>10}", f"{'Ray Worker CPU:':<30}{spark_result['spark_conf']['spark.executor.cores']:>10}", f"{'Ray Head Mem (GB):':<30}{int(spark_result['spark_conf']['spark.driver.memory'][:-1]):>10}", f"{'Ray Head CPU:':<30}{spark_result['spark_conf']['spark.driver.cores']:>10}", "", "Spark Settings:", f"{'Executors:':<30}{spark_result['spark_conf']['spark.executor.instances']:>10}", f"{'Executor Mem (GB):':<30}{int(spark_result['spark_conf']['spark.executor.memory'][:-1]):>10}", f"{'Executor Overhead Mem (GB):':<30}{int(spark_result['spark_conf']['spark.executor.memoryOverhead'][:-1]):>10}", f"{'Executor CPU:':<30}{spark_result['spark_conf']['spark.executor.cores']:>10}", f"{'Driver Mem(GB):':<30}{int(spark_result['spark_conf']['spark.driver.memory'][:-1]):>10}", f"{'Driver CPU:':<30}{spark_result['spark_conf']['spark.driver.cores']:>10}", ] for h in header: print(h) print("=" * 81) ctx.sql( 'select tpch_query, spark, df_ray, change as "change(=df_ray/spark)", change_text from results order by sort_index asc' ).show(num=100) out_path = f"datafusion-ray-spark-comparison-{ts}.json" open(out_path, "w").write(json.dumps(total_results)) @cli.command(help="Install k3s and configure it") @click.option( "--data-path", type=str, help="path to the directory that holds generated TPCH data. Should be >= 300GB", required=True, ) @click.option( "--k3s-url", type=str, help="url to head node of the cluster to join", ) @click.option( "--k3s-token", type=str, help="k3s token to authorize when joining the cluster", ) def k3s(**kwargs): assert runner is not None if kwargs["k3s_url"]: kwargs["k3s_url"] = f"K3S_URL={kwargs['k3s_url']}" if kwargs["k3s_token"]: kwargs["k3s_token"] = f"K3S_TOKEN={kwargs['k3s_token']}" runner.run_commands(cmds.cmds["k3s_setup"], kwargs) @cli.command(help="Generate TPCH data") @click.option( "--data-path", type=str, help="path to the directory that will hold the generated TPCH data. Should be >= 300GB", required=True, ) @click.option( "--scale-factor", type=click.Choice(["1", "10", "100", "1000"]), help="TPCH scale factor", required=True, ) @click.option( "--partitions", type=int, help="TPCH number of partitions for each table", required=True, ) @click.option( "--pool-size", type=int, default=1, help="number of concurrent processors to use. Watch out! too high and machine will lock up from too much memory use", ) def generate(**kwargs): assert runner is not None runner.run_commands(cmds.cmds["generate"], kwargs) @cli.command(help="just testing of toolbox shell commands that are harmless") def echo(): assert runner is not None runner.run_commands(cmds.cmds["echo"]) @cli.command() def help(): """Print the overall help message.""" click.echo(cli.get_help(click.Context(cli))) def newest_file(files: list[str]): return max(files, key=os.path.getctime) if __name__ == "__main__": cli()