benchmarks/db-benchmark/groupby-datafusion.py (166 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. #!/usr/bin/env python print("# groupby-datafusion.py", flush=True) import os import gc import timeit import datafusion as df from datafusion import functions as f from datafusion import col from pyarrow import csv as pacsv # exec(open("./_helpers/helpers.py").read()) def ans_shape(batches): rows, cols = 0, 0 for batch in batches: rows += batch.num_rows if cols == 0: cols = batch.num_columns else: assert(cols == batch.num_columns) return rows, cols # ver = df.__version__ ver = "7.0.0" git = "" task = "groupby" solution = "datafusion" fun = ".groupby" cache = "TRUE" on_disk = "FALSE" data_name = os.environ["SRC_DATANAME"] src_grp = os.path.join("data", data_name + ".csv") print("loading dataset %s" % src_grp, flush=True) data = pacsv.read_csv(src_grp, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)) print("dataset loaded") ctx = df.ExecutionContext() ctx.register_record_batches("x", [data.to_batches()]) print("registered record batches") # cols = ctx.sql("SHOW columns from x") # ans.show() in_rows = data.num_rows # print(in_rows, flush=True) task_init = timeit.default_timer() question = "sum v1 by id1" # q1 gc.collect() t_start = timeit.default_timer() ans = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1").collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q1: {t}") # m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start # write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() question = "sum v1 by id1:id2" # q2 gc.collect() t_start = timeit.default_timer() ans = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2").collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q2: {t}") # m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start # write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() question = "sum v1 mean v3 by id3" # q3 gc.collect() t_start = timeit.default_timer() ans = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3").collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q3: {t}") # m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] chkt = timeit.default_timer() - t_start # write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() question = "mean v1:v3 by id4" # q4 gc.collect() t_start = timeit.default_timer() ans = ctx.sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4").collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q4: {t}") # m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] chkt = timeit.default_timer() - t_start # write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() question = "sum v1:v3 by id6" # q5 gc.collect() t_start = timeit.default_timer() ans = ctx.sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6").collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q5: {t}") # m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0] chkt = timeit.default_timer() - t_start # write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() question = "median v3 sd v3 by id4 id5" # q6 gc.collect() t_start = timeit.default_timer() ans = ctx.sql("SELECT id4, id5, approx_percentile_cont(v3, .5) AS median_v3, stddev(v3) AS stddev_v3 FROM x GROUP BY id4, id5").collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q6: {t}") # m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("median_v3")), f.sum(col("stddev_v3"))]).collect()[0].to_pandas().to_numpy()[0] chkt = timeit.default_timer() - t_start # write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() question = "max v1 - min v2 by id3" # q7 gc.collect() t_start = timeit.default_timer() ans = ctx.sql("SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3").collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q7: {t}") # m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start # write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() question = "largest two v3 by id6" # q8 gc.collect() t_start = timeit.default_timer() ans = ctx.sql("SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2").collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q8: {t}") # m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start # write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() question = "regression v1 v2 by id2 id4" # q9 gc.collect() t_start = timeit.default_timer() ans = ctx.sql("SELECT corr(v1, v2) as corr FROM x GROUP BY id2, id4").collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q9: {t}") # m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("corr"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start # write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() question = "sum v3 count by id1:id6" # q10 gc.collect() t_start = timeit.default_timer() ans = ctx.sql("SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6").collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q10: {t}") # m = memory_usage() t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v3")), f.sum(col("cnt"))]).collect()[0].to_pandas().to_numpy()[0] chkt = timeit.default_timer() - t_start # write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() print("grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True) exit(0)