c/driver/postgresql/postgresql.cc (401 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// A libpq-based PostgreSQL driver for ADBC.
#include <cstring>
#include <memory>
#include <adbc.h>
#include "common/utils.h"
#include "connection.h"
#include "database.h"
#include "statement.h"
using adbcpq::PostgresConnection;
using adbcpq::PostgresDatabase;
using adbcpq::PostgresStatement;
// ---------------------------------------------------------------------
// ADBC interface implementation - as private functions so that these
// don't get replaced by the dynamic linker. If we implemented these
// under the Adbc* names, then DriverInit, the linker may resolve
// functions to the address of the functions provided by the driver
// manager instead of our functions.
//
// We could also:
// - Play games with RTLD_DEEPBIND - but this doesn't work with ASan
// - Use __attribute__((visibility("protected"))) - but this is
// apparently poorly supported by some linkers
// - Play with -Bsymbolic(-functions) - but this has other
// consequences and complicates the build setup
//
// So in the end some manual effort here was chosen.
// ---------------------------------------------------------------------
// AdbcDatabase
namespace {
AdbcStatusCode PostgresDatabaseInit(struct AdbcDatabase* database,
struct AdbcError* error) {
if (!database || !database->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr = reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database->private_data);
return (*ptr)->Init(error);
}
AdbcStatusCode PostgresDatabaseNew(struct AdbcDatabase* database,
struct AdbcError* error) {
if (!database) {
SetError(error, "%s", "[libpq] database must not be null");
return ADBC_STATUS_INVALID_STATE;
}
if (database->private_data) {
SetError(error, "%s", "[libpq] database is already initialized");
return ADBC_STATUS_INVALID_STATE;
}
auto impl = std::make_shared<PostgresDatabase>();
database->private_data = new std::shared_ptr<PostgresDatabase>(impl);
return ADBC_STATUS_OK;
}
AdbcStatusCode PostgresDatabaseRelease(struct AdbcDatabase* database,
struct AdbcError* error) {
if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr = reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database->private_data);
AdbcStatusCode status = (*ptr)->Release(error);
delete ptr;
database->private_data = nullptr;
return status;
}
AdbcStatusCode PostgresDatabaseSetOption(struct AdbcDatabase* database, const char* key,
const char* value, struct AdbcError* error) {
if (!database || !database->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr = reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database->private_data);
return (*ptr)->SetOption(key, value, error);
}
} // namespace
AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) {
return PostgresDatabaseInit(database, error);
}
AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
return PostgresDatabaseNew(database, error);
}
AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
struct AdbcError* error) {
return PostgresDatabaseRelease(database, error);
}
AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key,
const char* value, struct AdbcError* error) {
return PostgresDatabaseSetOption(database, key, value, error);
}
// ---------------------------------------------------------------------
// AdbcConnection
namespace {
AdbcStatusCode PostgresConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->Commit(error);
}
AdbcStatusCode PostgresConnectionGetInfo(struct AdbcConnection* connection,
uint32_t* info_codes, size_t info_codes_length,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->GetInfo(connection, info_codes, info_codes_length, stream, error);
}
AdbcStatusCode PostgresConnectionGetObjects(
struct AdbcConnection* connection, int depth, const char* catalog,
const char* db_schema, const char* table_name, const char** table_types,
const char* column_name, struct ArrowArrayStream* stream, struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->GetObjects(connection, depth, catalog, db_schema, table_name,
table_types, column_name, stream, error);
}
AdbcStatusCode PostgresConnectionGetTableSchema(
struct AdbcConnection* connection, const char* catalog, const char* db_schema,
const char* table_name, struct ArrowSchema* schema, struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->GetTableSchema(catalog, db_schema, table_name, schema, error);
}
AdbcStatusCode PostgresConnectionGetTableTypes(struct AdbcConnection* connection,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->GetTableTypes(connection, stream, error);
}
AdbcStatusCode PostgresConnectionInit(struct AdbcConnection* connection,
struct AdbcDatabase* database,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->Init(database, error);
}
AdbcStatusCode PostgresConnectionNew(struct AdbcConnection* connection,
struct AdbcError* error) {
auto impl = std::make_shared<PostgresConnection>();
connection->private_data = new std::shared_ptr<PostgresConnection>(impl);
return ADBC_STATUS_OK;
}
AdbcStatusCode PostgresConnectionReadPartition(struct AdbcConnection* connection,
const uint8_t* serialized_partition,
size_t serialized_length,
struct ArrowArrayStream* out,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
return ADBC_STATUS_NOT_IMPLEMENTED;
}
AdbcStatusCode PostgresConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
AdbcStatusCode status = (*ptr)->Release(error);
delete ptr;
connection->private_data = nullptr;
return status;
}
AdbcStatusCode PostgresConnectionRollback(struct AdbcConnection* connection,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->Rollback(error);
}
AdbcStatusCode PostgresConnectionSetOption(struct AdbcConnection* connection,
const char* key, const char* value,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
auto ptr =
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->SetOption(key, value, error);
}
} // namespace
AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error) {
return PostgresConnectionCommit(connection, error);
}
AdbcStatusCode AdbcConnectionGetInfo(struct AdbcConnection* connection,
uint32_t* info_codes, size_t info_codes_length,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
return PostgresConnectionGetInfo(connection, info_codes, info_codes_length, stream,
error);
}
AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
const char* catalog, const char* db_schema,
const char* table_name, const char** table_types,
const char* column_name,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
return PostgresConnectionGetObjects(connection, depth, catalog, db_schema, table_name,
table_types, column_name, stream, error);
}
AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name,
struct ArrowSchema* schema,
struct AdbcError* error) {
return PostgresConnectionGetTableSchema(connection, catalog, db_schema, table_name,
schema, error);
}
AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
return PostgresConnectionGetTableTypes(connection, stream, error);
}
AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
struct AdbcDatabase* database,
struct AdbcError* error) {
return PostgresConnectionInit(connection, database, error);
}
AdbcStatusCode AdbcConnectionNew(struct AdbcConnection* connection,
struct AdbcError* error) {
return PostgresConnectionNew(connection, error);
}
AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
const uint8_t* serialized_partition,
size_t serialized_length,
struct ArrowArrayStream* out,
struct AdbcError* error) {
return PostgresConnectionReadPartition(connection, serialized_partition,
serialized_length, out, error);
}
AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
return PostgresConnectionRelease(connection, error);
}
AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
struct AdbcError* error) {
return PostgresConnectionRollback(connection, error);
}
AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
const char* value, struct AdbcError* error) {
return PostgresConnectionSetOption(connection, key, value, error);
}
// ---------------------------------------------------------------------
// AdbcStatement
namespace {
AdbcStatusCode PostgresStatementBind(struct AdbcStatement* statement,
struct ArrowArray* values,
struct ArrowSchema* schema,
struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
auto* ptr =
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
return (*ptr)->Bind(values, schema, error);
}
AdbcStatusCode PostgresStatementBindStream(struct AdbcStatement* statement,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
auto* ptr =
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
return (*ptr)->Bind(stream, error);
}
AdbcStatusCode PostgresStatementExecutePartitions(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcPartitions* partitions,
int64_t* rows_affected,
struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
return ADBC_STATUS_NOT_IMPLEMENTED;
}
AdbcStatusCode PostgresStatementExecuteQuery(struct AdbcStatement* statement,
struct ArrowArrayStream* output,
int64_t* rows_affected,
struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
auto* ptr =
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
return (*ptr)->ExecuteQuery(output, rows_affected, error);
}
AdbcStatusCode PostgresStatementGetPartitionDesc(struct AdbcStatement* statement,
uint8_t* partition_desc,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}
AdbcStatusCode PostgresStatementGetPartitionDescSize(struct AdbcStatement* statement,
size_t* length,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}
AdbcStatusCode PostgresStatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
auto* ptr =
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
return (*ptr)->GetParameterSchema(schema, error);
}
AdbcStatusCode PostgresStatementNew(struct AdbcConnection* connection,
struct AdbcStatement* statement,
struct AdbcError* error) {
auto impl = std::make_shared<PostgresStatement>();
statement->private_data = new std::shared_ptr<PostgresStatement>(impl);
return impl->New(connection, error);
}
AdbcStatusCode PostgresStatementPrepare(struct AdbcStatement* statement,
struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
auto* ptr =
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
return (*ptr)->Prepare(error);
}
AdbcStatusCode PostgresStatementRelease(struct AdbcStatement* statement,
struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
auto* ptr =
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
auto status = (*ptr)->Release(error);
delete ptr;
statement->private_data = nullptr;
return status;
}
AdbcStatusCode PostgresStatementSetOption(struct AdbcStatement* statement,
const char* key, const char* value,
struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
auto* ptr =
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
return (*ptr)->SetOption(key, value, error);
}
AdbcStatusCode PostgresStatementSetSqlQuery(struct AdbcStatement* statement,
const char* query, struct AdbcError* error) {
if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
auto* ptr =
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
return (*ptr)->SetSqlQuery(query, error);
}
} // namespace
AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
struct ArrowArray* values, struct ArrowSchema* schema,
struct AdbcError* error) {
return PostgresStatementBind(statement, values, schema, error);
}
AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
return PostgresStatementBindStream(statement, stream, error);
}
AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
ArrowSchema* schema,
struct AdbcPartitions* partitions,
int64_t* rows_affected,
struct AdbcError* error) {
return PostgresStatementExecutePartitions(statement, schema, partitions, rows_affected,
error);
}
AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
struct ArrowArrayStream* output,
int64_t* rows_affected,
struct AdbcError* error) {
return PostgresStatementExecuteQuery(statement, output, rows_affected, error);
}
AdbcStatusCode AdbcStatementGetPartitionDesc(struct AdbcStatement* statement,
uint8_t* partition_desc,
struct AdbcError* error) {
return PostgresStatementGetPartitionDesc(statement, partition_desc, error);
}
AdbcStatusCode AdbcStatementGetPartitionDescSize(struct AdbcStatement* statement,
size_t* length,
struct AdbcError* error) {
return PostgresStatementGetPartitionDescSize(statement, length, error);
}
AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
return PostgresStatementGetParameterSchema(statement, schema, error);
}
AdbcStatusCode AdbcStatementNew(struct AdbcConnection* connection,
struct AdbcStatement* statement,
struct AdbcError* error) {
return PostgresStatementNew(connection, statement, error);
}
AdbcStatusCode AdbcStatementPrepare(struct AdbcStatement* statement,
struct AdbcError* error) {
return PostgresStatementPrepare(statement, error);
}
AdbcStatusCode AdbcStatementRelease(struct AdbcStatement* statement,
struct AdbcError* error) {
return PostgresStatementRelease(statement, error);
}
AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const char* key,
const char* value, struct AdbcError* error) {
return PostgresStatementSetOption(statement, key, value, error);
}
AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement,
const char* query, struct AdbcError* error) {
return PostgresStatementSetSqlQuery(statement, query, error);
}
extern "C" {
ADBC_EXPORT
AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* error) {
if (version != ADBC_VERSION_1_0_0) return ADBC_STATUS_NOT_IMPLEMENTED;
auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);
std::memset(driver, 0, sizeof(*driver));
driver->DatabaseInit = PostgresDatabaseInit;
driver->DatabaseNew = PostgresDatabaseNew;
driver->DatabaseRelease = PostgresDatabaseRelease;
driver->DatabaseSetOption = PostgresDatabaseSetOption;
driver->ConnectionCommit = PostgresConnectionCommit;
driver->ConnectionGetInfo = PostgresConnectionGetInfo;
driver->ConnectionGetObjects = PostgresConnectionGetObjects;
driver->ConnectionGetTableSchema = PostgresConnectionGetTableSchema;
driver->ConnectionGetTableTypes = PostgresConnectionGetTableTypes;
driver->ConnectionInit = PostgresConnectionInit;
driver->ConnectionNew = PostgresConnectionNew;
driver->ConnectionReadPartition = PostgresConnectionReadPartition;
driver->ConnectionRelease = PostgresConnectionRelease;
driver->ConnectionRollback = PostgresConnectionRollback;
driver->ConnectionSetOption = PostgresConnectionSetOption;
driver->StatementBind = PostgresStatementBind;
driver->StatementBindStream = PostgresStatementBindStream;
driver->StatementExecutePartitions = PostgresStatementExecutePartitions;
driver->StatementExecuteQuery = PostgresStatementExecuteQuery;
driver->StatementGetParameterSchema = PostgresStatementGetParameterSchema;
driver->StatementNew = PostgresStatementNew;
driver->StatementPrepare = PostgresStatementPrepare;
driver->StatementRelease = PostgresStatementRelease;
driver->StatementSetOption = PostgresStatementSetOption;
driver->StatementSetSqlQuery = PostgresStatementSetSqlQuery;
return ADBC_STATUS_OK;
}
}