python/adbc_driver_postgresql/benchmarks/benchmarks.py (104 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import abc
import asyncio
import itertools
import os
import asyncpg
import duckdb
import pandas
import psycopg
import sqlalchemy
import adbc_driver_postgresql.dbapi
class BenchmarkBase(abc.ABC):
async_conn: asyncpg.Connection
async_runner: asyncio.Runner
conn: adbc_driver_postgresql.dbapi.Connection
duck: duckdb.DuckDBPyConnection
sqlalchemy_connection: sqlalchemy.engine.base.Connection
def setup(self, *args, **kwargs) -> None:
self.uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
self.table = self._make_table_name(*args, **kwargs)
self.async_runner = asyncio.Runner()
self.async_conn = self.async_runner.run(asyncpg.connect(dsn=self.uri))
self.conn = adbc_driver_postgresql.dbapi.connect(self.uri)
self.duck = duckdb.connect()
self.duck.sql("INSTALL postgres_scanner")
self.duck.sql("LOAD postgres_scanner")
self.duck.sql(f"CALL postgres_attach('{self.uri}')")
uri = self.uri.replace("postgres://", "postgresql+psycopg2://")
self.sqlalchemy_connection = sqlalchemy.create_engine(uri).connect()
def teardown(self, *args, **kwargs) -> None:
self.async_runner.close()
self.conn.close()
self.sqlalchemy_connection.close()
@abc.abstractmethod
def _make_table_name(self, *args, **kwargs) -> str:
...
def time_pandas_adbc(self, row_count: int, data_type: str) -> None:
with self.conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {self.table}")
cursor.fetch_df()
def time_pandas_asyncpg(self, row_count: int, data_type: str) -> None:
records = self.async_runner.run(
self.async_conn.fetch(f"SELECT * FROM {self.table}")
)
pandas.DataFrame(records)
# TODO: fails with 'undefined symbol' (probably need to get it into Conda)
# def time_pandas_pgeon(self, row_count: int) -> None:
# pgeon.copy_query(self.uri, f"SELECT * FROM {self.table}").to_pandas()
def time_pandas_psycopg2(self, row_count: int, data_type: str) -> None:
pandas.read_sql_table(self.table, self.sqlalchemy_connection)
def time_pandas_duckdb(self, row_count: int, data_type: str) -> None:
self.duck.sql(f"SELECT * FROM {self.table}").fetchdf()
class OneColumnSuite(BenchmarkBase):
"""Benchmark the time it takes to fetch a single column of a given type."""
SETUP_QUERIES = [
"DROP TABLE IF EXISTS {table_name}",
"CREATE TABLE {table_name} (items {data_type})",
"""INSERT INTO {table_name} (items)
SELECT generated :: {data_type}
FROM GENERATE_SERIES(1, {row_count}) temp(generated)""",
# TODO: does an index matter, do we want to force PostgreSQL
# to update statistics?
]
param_data = {
"row_count": [10_000, 100_000, 1_000_000],
"data_type": ["INT", "BIGINT", "FLOAT", "DOUBLE PRECISION"],
}
param_names = list(param_data.keys())
params = list(param_data.values())
def setup_cache(self) -> None:
self.uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
with psycopg.connect(self.uri) as conn:
with conn.cursor() as cursor:
for row_count, data_type in itertools.product(*self.params):
table_name = self._make_table_name(row_count, data_type)
for query in self.SETUP_QUERIES:
cursor.execute(
query.format(
table_name=table_name,
row_count=row_count,
data_type=data_type,
)
)
def _make_table_name(self, row_count: int, data_type: str) -> str:
return (f"bench_{row_count}_{data_type.replace(' ', '_')}").lower()
class MultiColumnSuite(BenchmarkBase):
"""Benchmark the time it takes to fetch multiple columns of a given type."""
SETUP_QUERIES = [
"DROP TABLE IF EXISTS {table_name}",
"""
CREATE TABLE {table_name} (
a {data_type},
b {data_type},
c {data_type},
d {data_type}
)
""",
"""
INSERT INTO {table_name} (a, b, c, d)
SELECT generated :: {data_type},
generated :: {data_type},
generated :: {data_type},
generated :: {data_type}
FROM GENERATE_SERIES(1, {row_count}) temp(generated)
""",
]
param_data = {
"row_count": [10_000, 100_000, 1_000_000],
"data_type": ["INT", "BIGINT", "FLOAT", "DOUBLE PRECISION"],
}
param_names = list(param_data.keys())
params = list(param_data.values())
def setup_cache(self) -> None:
self.uri = os.environ["ADBC_POSTGRESQL_TEST_URI"]
with psycopg.connect(self.uri) as conn:
with conn.cursor() as cursor:
for row_count, data_type in itertools.product(*self.params):
table_name = self._make_table_name(row_count, data_type)
for query in self.SETUP_QUERIES:
cursor.execute(
query.format(
table_name=table_name,
row_count=row_count,
data_type=data_type,
)
)
def _make_table_name(self, row_count: int, data_type: str) -> str:
return (f"bench_{row_count}_{data_type.replace(' ', '_')}").lower()