benchmarks/db-benchmark/groupby-datafusion.py (166 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#!/usr/bin/env python
print("# groupby-datafusion.py", flush=True)
import os
import gc
import timeit
import datafusion as df
from datafusion import functions as f
from datafusion import col
from pyarrow import csv as pacsv
# exec(open("./_helpers/helpers.py").read())
def ans_shape(batches):
rows, cols = 0, 0
for batch in batches:
rows += batch.num_rows
if cols == 0:
cols = batch.num_columns
else:
assert(cols == batch.num_columns)
return rows, cols
# ver = df.__version__
ver = "7.0.0"
git = ""
task = "groupby"
solution = "datafusion"
fun = ".groupby"
cache = "TRUE"
on_disk = "FALSE"
data_name = os.environ["SRC_DATANAME"]
src_grp = os.path.join("data", data_name + ".csv")
print("loading dataset %s" % src_grp, flush=True)
data = pacsv.read_csv(src_grp, convert_options=pacsv.ConvertOptions(auto_dict_encode=True))
print("dataset loaded")
ctx = df.ExecutionContext()
ctx.register_record_batches("x", [data.to_batches()])
print("registered record batches")
# cols = ctx.sql("SHOW columns from x")
# ans.show()
in_rows = data.num_rows
# print(in_rows, flush=True)
task_init = timeit.default_timer()
question = "sum v1 by id1" # q1
gc.collect()
t_start = timeit.default_timer()
ans = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1").collect()
shape = ans_shape(ans)
# print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q1: {t}")
# m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
question = "sum v1 by id1:id2" # q2
gc.collect()
t_start = timeit.default_timer()
ans = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2").collect()
shape = ans_shape(ans)
# print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q2: {t}")
# m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
question = "sum v1 mean v3 by id3" # q3
gc.collect()
t_start = timeit.default_timer()
ans = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3").collect()
shape = ans_shape(ans)
# print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q3: {t}")
# m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0]
chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
question = "mean v1:v3 by id4" # q4
gc.collect()
t_start = timeit.default_timer()
ans = ctx.sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4").collect()
shape = ans_shape(ans)
# print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q4: {t}")
# m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0]
chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
question = "sum v1:v3 by id6" # q5
gc.collect()
t_start = timeit.default_timer()
ans = ctx.sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6").collect()
shape = ans_shape(ans)
# print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q5: {t}")
# m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0]
chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
question = "median v3 sd v3 by id4 id5" # q6
gc.collect()
t_start = timeit.default_timer()
ans = ctx.sql("SELECT id4, id5, approx_percentile_cont(v3, .5) AS median_v3, stddev(v3) AS stddev_v3 FROM x GROUP BY id4, id5").collect()
shape = ans_shape(ans)
# print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q6: {t}")
# m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("median_v3")), f.sum(col("stddev_v3"))]).collect()[0].to_pandas().to_numpy()[0]
chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
question = "max v1 - min v2 by id3" # q7
gc.collect()
t_start = timeit.default_timer()
ans = ctx.sql("SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3").collect()
shape = ans_shape(ans)
# print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q7: {t}")
# m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
question = "largest two v3 by id6" # q8
gc.collect()
t_start = timeit.default_timer()
ans = ctx.sql("SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2").collect()
shape = ans_shape(ans)
# print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q8: {t}")
# m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
question = "regression v1 v2 by id2 id4" # q9
gc.collect()
t_start = timeit.default_timer()
ans = ctx.sql("SELECT corr(v1, v2) as corr FROM x GROUP BY id2, id4").collect()
shape = ans_shape(ans)
# print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q9: {t}")
# m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("corr"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
question = "sum v3 count by id1:id6" # q10
gc.collect()
t_start = timeit.default_timer()
ans = ctx.sql("SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6").collect()
shape = ans_shape(ans)
# print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q10: {t}")
# m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("v3")), f.sum(col("cnt"))]).collect()[0].to_pandas().to_numpy()[0]
chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
print("grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True)
exit(0)