benchmarks/db-benchmark/join-datafusion.py (251 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import gc import os import timeit import datafusion as df from datafusion import col from datafusion import functions as f from pyarrow import csv as pacsv print("# join-datafusion.py", flush=True) exec(open("./_helpers/helpers.py").read()) def ans_shape(batches) -> tuple[int, int]: rows, cols = 0, 0 for batch in batches: rows += batch.num_rows if cols == 0: cols = batch.num_columns else: assert cols == batch.num_columns return rows, cols ver = df.__version__ task = "join" git = "" solution = "datafusion" fun = ".join" cache = "TRUE" on_disk = "FALSE" data_name = os.environ["SRC_DATANAME"] src_jn_x = os.path.join("data", data_name + ".csv") y_data_name = join_to_tbls(data_name) src_jn_y = [ os.path.join("data", y_data_name[0] + ".csv"), os.path.join("data", y_data_name[1] + ".csv"), os.path.join("data", y_data_name[2] + ".csv"), ] if len(src_jn_y) != 3: error_msg = "Something went wrong in preparing files used for join" raise Exception(error_msg) print( "loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[1] + ", " + y_data_name[2], flush=True, ) ctx = df.SessionContext() print(ctx) # TODO we should be applying projections to these table reads to create relations # of different sizes x_data = pacsv.read_csv( src_jn_x, convert_options=pacsv.ConvertOptions(auto_dict_encode=True) ) ctx.register_record_batches("x", [x_data.to_batches()]) small_data = pacsv.read_csv( src_jn_y[0], convert_options=pacsv.ConvertOptions(auto_dict_encode=True) ) ctx.register_record_batches("small", [small_data.to_batches()]) medium_data = pacsv.read_csv( src_jn_y[1], convert_options=pacsv.ConvertOptions(auto_dict_encode=True) ) ctx.register_record_batches("medium", [medium_data.to_batches()]) large_data = pacsv.read_csv( src_jn_y[2], convert_options=pacsv.ConvertOptions(auto_dict_encode=True) ) ctx.register_record_batches("large", [large_data.to_batches()]) print(x_data.num_rows, flush=True) print(small_data.num_rows, flush=True) print(medium_data.num_rows, flush=True) print(large_data.num_rows, flush=True) task_init = timeit.default_timer() print("joining...", flush=True) question = "small inner on int" # q1 gc.collect() t_start = timeit.default_timer() ans = ctx.sql( "SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1" ).collect() # ans = ctx.sql("SELECT * FROM x INNER JOIN small ON x.id1 = small.id1").collect() # print(set([b.schema for b in ans])) shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q1: {t}") t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start m = memory_usage() write_log( task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() question = "medium inner on int" # q2 gc.collect() t_start = timeit.default_timer() ans = ctx.sql( "SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x INNER JOIN medium ON x.id2 = medium.id2" ).collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q2: {t}") t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start m = memory_usage() write_log( task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() question = "medium outer on int" # q3 gc.collect() t_start = timeit.default_timer() ans = ctx.sql( "SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id2 = medium.id2" ).collect() shape = ans_shape(ans) # print(shape, flush=True) t = timeit.default_timer() - t_start print(f"q3: {t}") t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start m = memory_usage() write_log( task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() question = "medium inner on factor" # q4 gc.collect() t_start = timeit.default_timer() ans = ctx.sql( "SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id5 = medium.id5" ).collect() shape = ans_shape(ans) # print(shape) t = timeit.default_timer() - t_start print(f"q4: {t}") t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start m = memory_usage() write_log( task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() question = "big inner on int" # q5 gc.collect() t_start = timeit.default_timer() ans = ctx.sql( "SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x LEFT JOIN large ON x.id3 = large.id3" ).collect() shape = ans_shape(ans) # print(shape) t = timeit.default_timer() - t_start print(f"q5: {t}") t_start = timeit.default_timer() df = ctx.create_dataframe([ans]) chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0] chkt = timeit.default_timer() - t_start m = memory_usage() write_log( task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk, ) del ans gc.collect() print( "joining finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True, ) exit(0)