c/driver/postgresql/database.cc (195 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "database.h"
#include <cinttypes>
#include <cstring>
#include <memory>
#include <utility>
#include <vector>
#include <adbc.h>
#include <libpq-fe.h>
#include <nanoarrow/nanoarrow.h>
#include "common/utils.h"
namespace adbcpq {
PostgresDatabase::PostgresDatabase() : open_connections_(0) {
type_resolver_ = std::make_shared<PostgresTypeResolver>();
}
PostgresDatabase::~PostgresDatabase() = default;
AdbcStatusCode PostgresDatabase::Init(struct AdbcError* error) {
// Connect to validate the parameters.
return RebuildTypeResolver(error);
}
AdbcStatusCode PostgresDatabase::Release(struct AdbcError* error) {
if (open_connections_ != 0) {
SetError(error, "%s%" PRId32 "%s", "[libpq] Database released with ",
open_connections_, " open connections");
return ADBC_STATUS_INVALID_STATE;
}
return ADBC_STATUS_OK;
}
AdbcStatusCode PostgresDatabase::SetOption(const char* key, const char* value,
struct AdbcError* error) {
if (strcmp(key, "uri") == 0) {
uri_ = value;
} else {
SetError(error, "%s%s", "[libpq] Unknown database option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}
return ADBC_STATUS_OK;
}
AdbcStatusCode PostgresDatabase::Connect(PGconn** conn, struct AdbcError* error) {
if (uri_.empty()) {
SetError(error, "%s",
"[libpq] Must set database option 'uri' before creating a connection");
return ADBC_STATUS_INVALID_STATE;
}
*conn = PQconnectdb(uri_.c_str());
if (PQstatus(*conn) != CONNECTION_OK) {
SetError(error, "%s%s", "[libpq] Failed to connect: ", PQerrorMessage(*conn));
PQfinish(*conn);
*conn = nullptr;
return ADBC_STATUS_IO;
}
open_connections_++;
return ADBC_STATUS_OK;
}
AdbcStatusCode PostgresDatabase::Disconnect(PGconn** conn, struct AdbcError* error) {
PQfinish(*conn);
*conn = nullptr;
if (--open_connections_ < 0) {
SetError(error, "%s", "[libpq] Open connection count underflowed");
return ADBC_STATUS_INTERNAL;
}
return ADBC_STATUS_OK;
}
// Helpers for building the type resolver from queries
static inline int32_t InsertPgAttributeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver);
static inline int32_t InsertPgTypeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver);
AdbcStatusCode PostgresDatabase::RebuildTypeResolver(struct AdbcError* error) {
PGconn* conn = nullptr;
AdbcStatusCode final_status = Connect(&conn, error);
if (final_status != ADBC_STATUS_OK) {
return final_status;
}
// We need a few queries to build the resolver. The current strategy might
// fail for some recursive definitions (e.g., arrays of records of arrays).
// First, one on the pg_attribute table to resolve column names/oids for
// record types.
const std::string kColumnsQuery = R"(
SELECT
attrelid,
attname,
atttypid
FROM
pg_catalog.pg_attribute
ORDER BY
attrelid, attnum
)";
// Second, a query of the pg_type table. This query may need a few attempts to handle
// recursive definitions (e.g., record types with array column). This currently won't
// handle range types because those rows don't have child OID information. Arrays types
// are inserted after a successful insert of the element type.
const std::string kTypeQuery = R"(
SELECT
oid,
typname,
typreceive,
typbasetype,
typarray,
typrelid
FROM
pg_catalog.pg_type
WHERE
(typreceive != 0 OR typname = 'aclitem') AND typtype != 'r' AND typreceive::TEXT != 'array_recv'
ORDER BY
oid
)";
// Create a new type resolver (this instance's type_resolver_ member
// will be updated at the end if this succeeds).
auto resolver = std::make_shared<PostgresTypeResolver>();
// Insert record type definitions (this includes table schemas)
pg_result* result = PQexec(conn, kColumnsQuery.c_str());
ExecStatusType pq_status = PQresultStatus(result);
if (pq_status == PGRES_TUPLES_OK) {
InsertPgAttributeResult(result, resolver);
} else {
SetError(error, "%s%s",
"[libpq] Failed to build type mapping table: ", PQerrorMessage(conn));
final_status = ADBC_STATUS_IO;
}
PQclear(result);
// Attempt filling the resolver a few times to handle recursive definitions.
int32_t max_attempts = 3;
for (int32_t i = 0; i < max_attempts; i++) {
result = PQexec(conn, kTypeQuery.c_str());
ExecStatusType pq_status = PQresultStatus(result);
if (pq_status == PGRES_TUPLES_OK) {
InsertPgTypeResult(result, resolver);
} else {
SetError(error, "%s%s",
"[libpq] Failed to build type mapping table: ", PQerrorMessage(conn));
final_status = ADBC_STATUS_IO;
}
PQclear(result);
if (final_status != ADBC_STATUS_OK) {
break;
}
}
// Disconnect since PostgreSQL connections can be heavy.
{
AdbcStatusCode status = Disconnect(&conn, error);
if (status != ADBC_STATUS_OK) final_status = status;
}
if (final_status == ADBC_STATUS_OK) {
type_resolver_ = std::move(resolver);
}
return final_status;
}
static inline int32_t InsertPgAttributeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
int num_rows = PQntuples(result);
std::vector<std::pair<std::string, uint32_t>> columns;
uint32_t current_type_oid = 0;
int32_t n_added = 0;
for (int row = 0; row < num_rows; row++) {
const uint32_t type_oid = static_cast<uint32_t>(
std::strtol(PQgetvalue(result, row, 0), /*str_end=*/nullptr, /*base=*/10));
const char* col_name = PQgetvalue(result, row, 1);
const uint32_t col_oid = static_cast<uint32_t>(
std::strtol(PQgetvalue(result, row, 2), /*str_end=*/nullptr, /*base=*/10));
if (type_oid != current_type_oid && !columns.empty()) {
resolver->InsertClass(current_type_oid, columns);
columns.clear();
current_type_oid = type_oid;
n_added++;
}
columns.push_back({col_name, col_oid});
}
if (!columns.empty()) {
resolver->InsertClass(current_type_oid, columns);
n_added++;
}
return n_added;
}
static inline int32_t InsertPgTypeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
int num_rows = PQntuples(result);
PostgresTypeResolver::Item item;
int32_t n_added = 0;
for (int row = 0; row < num_rows; row++) {
const uint32_t oid = static_cast<uint32_t>(
std::strtol(PQgetvalue(result, row, 0), /*str_end=*/nullptr, /*base=*/10));
const char* typname = PQgetvalue(result, row, 1);
const char* typreceive = PQgetvalue(result, row, 2);
const uint32_t typbasetype = static_cast<uint32_t>(
std::strtol(PQgetvalue(result, row, 3), /*str_end=*/nullptr, /*base=*/10));
const uint32_t typarray = static_cast<uint32_t>(
std::strtol(PQgetvalue(result, row, 4), /*str_end=*/nullptr, /*base=*/10));
const uint32_t typrelid = static_cast<uint32_t>(
std::strtol(PQgetvalue(result, row, 5), /*str_end=*/nullptr, /*base=*/10));
// Special case the aclitem because it shows up in a bunch of internal tables
if (strcmp(typname, "aclitem") == 0) {
typreceive = "aclitem_recv";
}
item.oid = oid;
item.typname = typname;
item.typreceive = typreceive;
item.class_oid = typrelid;
item.base_oid = typbasetype;
int result = resolver->Insert(item, nullptr);
// If there's an array type and the insert succeeded, add that now too
if (result == NANOARROW_OK && typarray != 0) {
std::string array_typname = "_" + std::string(typname);
item.oid = typarray;
item.typname = array_typname.c_str();
item.typreceive = "array_recv";
item.child_oid = oid;
resolver->Insert(item, nullptr);
}
}
return n_added;
}
} // namespace adbcpq