def results()

in k8s/bench_toolbox.py [0:0]


def results(data_path, data_device, scale_factor, output_path):
    df_result = json.loads(
        open(
            newest_file(glob.glob(os.path.join(output_path, "datafusion-ray*json")))
        ).read()
    )
    spark_result = json.loads(
        open(newest_file(glob.glob(os.path.join(output_path, "spark-tpch*json")))).read()
    )
    print(df_result)
    print(spark_result)
    total_results = {"spark": spark_result, "df-ray": df_result}

    spark = [spark_result["queries"][f"{i}"] for i in range(1, 23)]
    df_ray = [df_result["queries"][f"{i}"] for i in range(1, 23)]

    # add a final row with the totals
    spark += [sum(spark)]
    df_ray += [sum(df_ray)]

    # add another final row with costs

    # df for "dataframe" here, not "datafusion".  Just using pandas for easy output
    df = pd.DataFrame({"spark": spark, "df_ray": df_ray})
    df["change"] = df["df_ray"] / df["spark"]

    df["change_text"] = df["change"].apply(
        lambda change: (
            f"+{(1 / change):.2f}x faster" if change < 1.0 else f" {change:.2f}x slower"
        )
    )
    df["tpch_query"] = [f"{i}" for i in range(1, 23)] + ["total"]
    df["sort_index"] = list(range(1, 24))

    df["spark"] = df["spark"].apply(lambda s: f"{s:>10.4f}")
    df["df_ray"] = df["df_ray"].apply(lambda s: f"{s:>10.4f}")
    df["change"] = df["change"].apply(lambda s: f"{s:>21.4f}")
    df["change_text"] = df["change_text"].apply(lambda s: f"{s:>14}")

    ts = time.time()
    df.to_parquet(f"datafusion-ray-spark-comparison-{ts}.parquet")
    ctx = datafusion.SessionContext()
    ctx.register_parquet("results", f"datafusion-ray-spark-comparison-{ts}.parquet")

    cpu = subprocess.run(
        "lscpu | grep 'Model name' |awk '{print $3}'",
        shell=True,
        capture_output=True,
        text=True,
    ).stdout.strip()
    quantity = subprocess.run(
        "lscpu | grep '^CPU(s):' |awk '{print $2}'",
        shell=True,
        capture_output=True,
        text=True,
    ).stdout.strip()
    memory = subprocess.run(
        "lsmem | grep 'Total online' |awk '{print $4}'",
        shell=True,
        capture_output=True,
        text=True,
    ).stdout.strip()
    hdresults = subprocess.run(
        f"sudo hdparm -t {data_device}|grep 'MB/sec'",
        shell=True,
        capture_output=True,
        text=True,
    ).stdout.strip()

    hdresult = re.search(r"([\d\.]+) MB/sec", hdresults, re.MULTILINE).group(1)

    machine = ec2_metadata.ec2_metadata.instance_type

    # if you get reserved it includes any discounts you may have associated
    # with your ec2 credentials.  So a public price is appropriate for sharing
    hourly_cost = pricing.get_on_demand_price("us-east-1", machine)

    spark_cost = spark[-1] / 3600 * hourly_cost
    df_ray_cost = df_ray[-1] / 3600 * hourly_cost

    cost_delta = df_ray_cost / spark_cost
    cost_delta_text = (
        f"+{(1 / cost_delta):.2f}x cheaper"
        if cost_delta < 1.0
        else f"{cost_delta:.2f}x more expensive"
    )
    speed_delta_text = df["change_text"].iloc[-1]

    df_ray_cost = f"${df_ray_cost:.4f}"
    df_ray_duration = f"{df_ray[-1]:.2f}s"
    spark_cost = f"${spark_cost:.4f}"
    spark_duration = f"{spark[-1]:.2f}s"

    print("=" * 81)
    # the formatting code is terrible here, but it works for now
    header = [
        "Spark and DataFusionRay TPCH 100 Benchmarks",
        f"{'Machine:':<30}{machine}",
        f"{'Machine On Demand Cost:':<30}{hourly_cost} $/hr",
        f"{'CPU(s):':<30}{cpu} {quantity}x",
        f"{'MEM:':<30}{memory}",
        f"{'HD Throughput:':<30}{hdresult} MB/s (from hdparm)",
        f"{'Data Location:':<30}{data_path}/sf{scale_factor}",
        "",
        f"{'df-ray duration:':<30}{df_ray_duration:>10} {speed_delta_text}",
        f"{'df-ray cost:':<30}{df_ray_cost:>10} {cost_delta_text}",
        "",
        f"{'spark duration:':<30}{spark_duration:>10}",
        f"{'spark cost:':<30}{spark_cost:>10}",
        "",
        "DataFusionRay Settings:",
        f"{'concurrency:':<30}{df_result['settings']['concurrency']:>10}",
        f"{'batch_size :':<30}{df_result['settings']['batch_size']:>10}",
        f"{'partitions_per_processor:':<30}{df_result['settings']['partitions_per_processor']:>10}",
        f"{'Ray Workers:':<30}{spark_result['spark_conf']['spark.executor.instances']:>10}",
        f"{'Ray Worker Mem (GB):':<30}{int(spark_result['spark_conf']['spark.executor.memory'][:-1]) + int(spark_result['spark_conf']['spark.executor.memoryOverhead'][:-1]):>10}",
        f"{'Ray Worker CPU:':<30}{spark_result['spark_conf']['spark.executor.cores']:>10}",
        f"{'Ray Head Mem (GB):':<30}{int(spark_result['spark_conf']['spark.driver.memory'][:-1]):>10}",
        f"{'Ray Head CPU:':<30}{spark_result['spark_conf']['spark.driver.cores']:>10}",
        "",
        "Spark Settings:",
        f"{'Executors:':<30}{spark_result['spark_conf']['spark.executor.instances']:>10}",
        f"{'Executor Mem (GB):':<30}{int(spark_result['spark_conf']['spark.executor.memory'][:-1]):>10}",
        f"{'Executor Overhead Mem (GB):':<30}{int(spark_result['spark_conf']['spark.executor.memoryOverhead'][:-1]):>10}",
        f"{'Executor CPU:':<30}{spark_result['spark_conf']['spark.executor.cores']:>10}",
        f"{'Driver Mem(GB):':<30}{int(spark_result['spark_conf']['spark.driver.memory'][:-1]):>10}",
        f"{'Driver CPU:':<30}{spark_result['spark_conf']['spark.driver.cores']:>10}",
    ]
    for h in header:
        print(h)

    print("=" * 81)
    ctx.sql(
        'select tpch_query, spark, df_ray, change as "change(=df_ray/spark)", change_text from results order by sort_index asc'
    ).show(num=100)

    out_path = f"datafusion-ray-spark-comparison-{ts}.json"
    open(out_path, "w").write(json.dumps(total_results))