dev/bench/run_bench.py (119 lines of code) (raw):
# #!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import argparse
import time
import psutil
import adbc_driver_snowflake.dbapi
# import pyodbc
# import snowflake.connector
process = psutil.Process()
SAMPLE_RATE = 10 # record data every SAMPLE_RATE execution
can_draw = True
try:
import matplotlib.pyplot as plt
except ImportError:
print("graphs cannot be drawn as matplotlib is not installed")
can_draw = False
def task_execution_decorator(func, perf_file, memory_file):
count = 0
def wrapper(*args, **kwargs):
start = time.time()
func(*args, **kwargs)
memory_usage = (
process.memory_info().rss / 1024 / 1024
) # rss is in bytes, we convert to MB
period = time.time() - start
nonlocal count
if count % SAMPLE_RATE == 0:
perf_file.write(str(period) + "\n")
print(f"execution {count}")
print(f"memory usage: {memory_usage} MB")
print(f"execution time: {period} s")
memory_file.write(str(memory_usage) + "\n")
count += 1
return wrapper
def task_fetch_arrow_batches(cursor, table_name, row_count_limit=50000):
ret = cursor.execute(
f"select * from {table_name} limit {row_count_limit}"
).fetch_arrow_batches() # interface for snowflake-python-connector
for _ in ret:
pass
def task_fetch_record_batch(cursor, table_name, row_count_limit=50000):
cursor.execute(f"select * from {table_name} limit {row_count_limit}")
ret = cursor.fetch_record_batch() # interface we provide for dbapi
for _ in ret:
pass
def task_fetch_odbc(cursor, table_name, row_count_limit=50000):
cursor.execute(f"select * from {table_name} limit {row_count_limit}")
batch_size = row_count_limit / 1000
while True:
rows = cursor.fetchmany(batch_size)
if len(rows) == 0:
break
def execute_task(task, cursor, table_name, iteration_cnt, row_count_limit=50000):
for _ in range(iteration_cnt):
task(cursor, table_name, row_count_limit)
ADBC_CONNECTION_PARAMETERS = {
"user": "...",
"password": "...",
"adbc.snowflake.sql.account": "...",
}
PYODBC_CONNECTION_STR = "DSN=..."
CONNECTION_PARAMETERS = {
"user": "...",
"password": "...",
"account": "...",
}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--iteration_cnt",
type=int,
default=50,
help="how many times to run the test function, default is 5000",
)
parser.add_argument(
"--row_count",
type=int,
default=10000,
help="how many rows of data to fetch",
)
parser.add_argument(
"--test_table_name",
type=str,
default="SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM",
hep="an existing test table that has data prepared",
)
args = parser.parse_args()
test_table_name = args.test_table_name
perf_record_file = "stress_perf_record"
memory_record_file = "stress_memory_record"
# with pyodbc.connect(PYODBC_CONNECTION_STR) as conn, conn.cursor() as cursor:
# with snowflake.connector.connect(
# **CONNECTION_PARAMETERS
# ) as conn, conn.cursor() as cursor:
with adbc_driver_snowflake.dbapi.connect(
**ADBC_CONNECTION_PARAMETERS
) as conn, conn.cursor() as cursor:
cursor.adbc.statement.set_options(
**{
"adbc.snowflake.rpc.prefetch_concurrency": 4,
"adbc.rpc.result_queue_size": 100,
}
)
with open(perf_record_file, "w") as perf_file, open(
memory_record_file, "w"
) as memory_file:
# task = task_execution_decorator( # snowflake python connector
# task_fetch_arrow_batches, perf_file, memory_file)
# task = task_execution_decorator( # pyodbc
# task_fetch_odbc, perf_file, memory_file)
task = task_execution_decorator(
task_fetch_record_batch, perf_file, memory_file
)
execute_task(
task, cursor, test_table_name, args.iteration_cnt, args.row_count
)
if can_draw:
with open(perf_record_file) as perf_file, open(
memory_record_file
) as memory_file:
# sample rate
perf_lines = perf_file.readlines()
perf_records = [float(line) for line in perf_lines]
memory_lines = memory_file.readlines()
memory_records = [float(line) for line in memory_lines]
plt.plot([i for i in range(len(perf_records))], perf_records)
plt.title("per iteration execution time")
plt.show(block=False)
plt.figure()
plt.plot([i for i in range(len(memory_records))], memory_records)
plt.title("memory usage")
plt.show(block=True)