c/driver/postgresql/statement.cc (720 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Windows #define NOMINMAX #include "statement.h" #include <algorithm> #include <array> #include <cassert> #include <cerrno> #include <cinttypes> #include <cstring> #include <iostream> #include <limits> #include <memory> #include <string> #include <string_view> #include <utility> #include <vector> #include <arrow-adbc/adbc.h> #include <libpq-fe.h> #include <nanoarrow/nanoarrow.hpp> #include "bind_stream.h" #include "connection.h" #include "driver/common/options.h" #include "driver/common/utils.h" #include "driver/framework/utility.h" #include "error.h" #include "postgres_type.h" #include "postgres_util.h" #include "result_helper.h" #include "result_reader.h" namespace adbcpq { int TupleReader::GetSchema(struct ArrowSchema* out) { assert(copy_reader_ != nullptr); ArrowErrorInit(&na_error_); int na_res = copy_reader_->GetSchema(out); if (out->release == nullptr) { SetError(&error_, "[libpq] Result set was already consumed or freed"); status_ = ADBC_STATUS_INVALID_STATE; return AdbcStatusCodeToErrno(status_); } else if (na_res != NANOARROW_OK) { // e.g., Can't allocate memory SetError(&error_, "[libpq] Error copying schema"); status_ = ADBC_STATUS_INTERNAL; } return na_res; } int TupleReader::GetCopyData() { if (pgbuf_ != nullptr) { PQfreemem(pgbuf_); pgbuf_ = nullptr; } data_.size_bytes = 0; data_.data.as_char = nullptr; int get_copy_res = PQgetCopyData(conn_, &pgbuf_, /*async=*/0); if (get_copy_res == -2) { SetError(&error_, "[libpq] PQgetCopyData() failed: %s", PQerrorMessage(conn_)); status_ = ADBC_STATUS_IO; return AdbcStatusCodeToErrno(status_); } if (get_copy_res == -1) { // Check the server-side response PQclear(result_); result_ = PQgetResult(conn_); const ExecStatusType pq_status = PQresultStatus(result_); if (pq_status != PGRES_COMMAND_OK) { status_ = SetError(&error_, result_, "[libpq] Execution error [%s]: %s", PQresStatus(pq_status), PQresultErrorMessage(result_)); return AdbcStatusCodeToErrno(status_); } else { return ENODATA; } } data_.size_bytes = get_copy_res; data_.data.as_char = pgbuf_; return NANOARROW_OK; } int TupleReader::AppendRowAndFetchNext() { // Parse the result (the header AND the first row are included in the first // call to PQgetCopyData()) int na_res = copy_reader_->ReadRecord(&data_, &na_error_); if (na_res != NANOARROW_OK && na_res != ENODATA) { SetError(&error_, "[libpq] ReadRecord failed at row %" PRId64 ": %s", row_id_, na_error_.message); status_ = ADBC_STATUS_IO; return na_res; } row_id_++; NANOARROW_RETURN_NOT_OK(GetCopyData()); if ((copy_reader_->array_size_approx_bytes() + data_.size_bytes) >= batch_size_hint_bytes_) { // Appending the next row will result in an array larger than requested. // Return EOVERFLOW to force GetNext() to build the current result and return. return EOVERFLOW; } return NANOARROW_OK; } int TupleReader::BuildOutput(struct ArrowArray* out) { if (copy_reader_->array_size_approx_bytes() == 0) { out->release = nullptr; return NANOARROW_OK; } int na_res = copy_reader_->GetArray(out, &na_error_); if (na_res != NANOARROW_OK) { SetError(&error_, "[libpq] Failed to build result array: %s", na_error_.message); status_ = ADBC_STATUS_INTERNAL; return na_res; } return NANOARROW_OK; } int TupleReader::GetNext(struct ArrowArray* out) { if (is_finished_) { out->release = nullptr; return 0; } int na_res; ArrowErrorInit(&na_error_); if (row_id_ == -1) { na_res = GetCopyData(); if (na_res == ENODATA) { is_finished_ = true; out->release = nullptr; return 0; } else if (na_res != NANOARROW_OK) { return na_res; } na_res = copy_reader_->ReadHeader(&data_, &na_error_); if (na_res != NANOARROW_OK) { SetError(&error_, "[libpq] ReadHeader() failed: %s", na_error_.message); return na_res; } row_id_++; } do { na_res = AppendRowAndFetchNext(); if (na_res == EOVERFLOW) { // The result would be too big to return if we appended the row. When EOVERFLOW is // returned, the copy reader leaves the output in a valid state. The data is left in // pg_buf_/data_ and will attempt to be appended on the next call to GetNext() return BuildOutput(out); } } while (na_res == NANOARROW_OK); if (na_res != ENODATA) { return na_res; } is_finished_ = true; // Finish the result properly and return the last result. Note that BuildOutput() may // set tmp.release = nullptr if there were zero rows in the copy reader (can // occur in an overflow scenario). NANOARROW_RETURN_NOT_OK(BuildOutput(out)); return NANOARROW_OK; } void TupleReader::Release() { if (error_.release) { error_.release(&error_); } error_ = ADBC_ERROR_INIT; status_ = ADBC_STATUS_OK; if (result_) { PQclear(result_); result_ = nullptr; } if (pgbuf_) { PQfreemem(pgbuf_); pgbuf_ = nullptr; } if (copy_reader_) { copy_reader_.reset(); } is_finished_ = false; row_id_ = -1; } // Instead of directly exporting the TupleReader, which is tied to the // lifetime of the Statement, we export a weak_ptr reference instead. That // way if the user accidentally closes the Statement before the // ArrowArrayStream, we can avoid a crash. // See https://github.com/apache/arrow-adbc/issues/2629 struct ExportedTupleReader { std::weak_ptr<TupleReader> self; }; void TupleReader::ExportTo(struct ArrowArrayStream* stream) { stream->get_schema = &GetSchemaTrampoline; stream->get_next = &GetNextTrampoline; stream->get_last_error = &GetLastErrorTrampoline; stream->release = &ReleaseTrampoline; stream->private_data = new ExportedTupleReader{weak_from_this()}; } const struct AdbcError* TupleReader::ErrorFromArrayStream(struct ArrowArrayStream* self, AdbcStatusCode* status) { if (!self->private_data || self->release != &ReleaseTrampoline) { return nullptr; } auto* wrapper = static_cast<ExportedTupleReader*>(self->private_data); auto maybe_reader = wrapper->self.lock(); if (maybe_reader) { if (status) { *status = maybe_reader->status_; } return &maybe_reader->error_; } return nullptr; } int TupleReader::GetSchemaTrampoline(struct ArrowArrayStream* self, struct ArrowSchema* out) { if (!self || !self->private_data) return EINVAL; auto* wrapper = static_cast<ExportedTupleReader*>(self->private_data); auto maybe_reader = wrapper->self.lock(); if (maybe_reader) { return maybe_reader->GetSchema(out); } // statement was closed or reader was otherwise invalidated return EINVAL; } int TupleReader::GetNextTrampoline(struct ArrowArrayStream* self, struct ArrowArray* out) { if (!self || !self->private_data) return EINVAL; auto* wrapper = static_cast<ExportedTupleReader*>(self->private_data); auto maybe_reader = wrapper->self.lock(); if (maybe_reader) { return maybe_reader->GetNext(out); } // statement was closed or reader was otherwise invalidated return EINVAL; } const char* TupleReader::GetLastErrorTrampoline(struct ArrowArrayStream* self) { if (!self || !self->private_data) return nullptr; constexpr std::string_view kReaderInvalidated = "[libpq] Reader invalidated (statement or reader was closed)"; auto* wrapper = static_cast<ExportedTupleReader*>(self->private_data); auto maybe_reader = wrapper->self.lock(); if (maybe_reader) { return maybe_reader->last_error(); } // statement was closed or reader was otherwise invalidated return kReaderInvalidated.data(); } void TupleReader::ReleaseTrampoline(struct ArrowArrayStream* self) { if (!self || !self->private_data) return; auto* wrapper = static_cast<ExportedTupleReader*>(self->private_data); auto maybe_reader = wrapper->self.lock(); if (maybe_reader) { maybe_reader->Release(); } delete wrapper; self->private_data = nullptr; self->release = nullptr; } AdbcStatusCode PostgresStatement::New(struct AdbcConnection* connection, struct AdbcError* error) { if (!connection || !connection->private_data) { SetError(error, "%s", "[libpq] Must provide an initialized AdbcConnection"); return ADBC_STATUS_INVALID_ARGUMENT; } connection_ = *reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data); type_resolver_ = connection_->type_resolver(); ClearResult(); return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::Bind(struct ArrowArray* values, struct ArrowSchema* schema, struct AdbcError* error) { if (!values || !values->release) { SetError(error, "%s", "[libpq] Must provide non-NULL array"); return ADBC_STATUS_INVALID_ARGUMENT; } else if (!schema || !schema->release) { SetError(error, "%s", "[libpq] Must provide non-NULL schema"); return ADBC_STATUS_INVALID_ARGUMENT; } if (bind_.release) bind_.release(&bind_); // Make a one-value stream adbc::driver::MakeArrayStream(schema, values, &bind_); return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::Bind(struct ArrowArrayStream* stream, struct AdbcError* error) { if (!stream || !stream->release) { SetError(error, "%s", "[libpq] Must provide non-NULL stream"); return ADBC_STATUS_INVALID_ARGUMENT; } // Move stream if (bind_.release) bind_.release(&bind_); bind_ = *stream; std::memset(stream, 0, sizeof(*stream)); return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::Cancel(struct AdbcError* error) { // Ultimately the same underlying PGconn return connection_->Cancel(error); } AdbcStatusCode PostgresStatement::CreateBulkTable(const std::string& current_schema, const struct ArrowSchema& source_schema, std::string* escaped_table, std::string* escaped_field_list, struct AdbcError* error) { PGconn* conn = connection_->conn(); if (!ingest_.db_schema.empty() && ingest_.temporary) { SetError(error, "[libpq] Cannot set both %s and %s", ADBC_INGEST_OPTION_TARGET_DB_SCHEMA, ADBC_INGEST_OPTION_TEMPORARY); return ADBC_STATUS_INVALID_STATE; } { if (!ingest_.db_schema.empty()) { char* escaped = PQescapeIdentifier(conn, ingest_.db_schema.c_str(), ingest_.db_schema.size()); if (escaped == nullptr) { SetError(error, "[libpq] Failed to escape target schema %s for ingestion: %s", ingest_.db_schema.c_str(), PQerrorMessage(conn)); return ADBC_STATUS_INTERNAL; } *escaped_table += escaped; *escaped_table += " . "; PQfreemem(escaped); } else if (ingest_.temporary) { // OK to be redundant (CREATE TEMPORARY TABLE pg_temp.foo) *escaped_table += "pg_temp . "; } else { // Explicitly specify the current schema to avoid any temporary tables // shadowing this table char* escaped = PQescapeIdentifier(conn, current_schema.c_str(), current_schema.size()); *escaped_table += escaped; *escaped_table += " . "; PQfreemem(escaped); } if (!ingest_.target.empty()) { char* escaped = PQescapeIdentifier(conn, ingest_.target.c_str(), ingest_.target.size()); if (escaped == nullptr) { SetError(error, "[libpq] Failed to escape target table %s for ingestion: %s", ingest_.target.c_str(), PQerrorMessage(conn)); return ADBC_STATUS_INTERNAL; } *escaped_table += escaped; PQfreemem(escaped); } } std::string create; if (ingest_.temporary) { create = "CREATE TEMPORARY TABLE "; } else { create = "CREATE TABLE "; } switch (ingest_.mode) { case IngestMode::kCreate: case IngestMode::kAppend: // Nothing to do break; case IngestMode::kReplace: { std::string drop = "DROP TABLE IF EXISTS " + *escaped_table; PGresult* result = PQexecParams(conn, drop.c_str(), /*nParams=*/0, /*paramTypes=*/nullptr, /*paramValues=*/nullptr, /*paramLengths=*/nullptr, /*paramFormats=*/nullptr, /*resultFormat=*/1 /*(binary)*/); if (PQresultStatus(result) != PGRES_COMMAND_OK) { AdbcStatusCode code = SetError(error, result, "[libpq] Failed to drop table: %s\nQuery was: %s", PQerrorMessage(conn), drop.c_str()); PQclear(result); return code; } PQclear(result); break; } case IngestMode::kCreateAppend: create += "IF NOT EXISTS "; break; } create += *escaped_table; create += " ("; for (int64_t i = 0; i < source_schema.n_children; i++) { if (i > 0) { create += ", "; *escaped_field_list += ", "; } const char* unescaped = source_schema.children[i]->name; char* escaped = PQescapeIdentifier(conn, unescaped, std::strlen(unescaped)); if (escaped == nullptr) { SetError(error, "[libpq] Failed to escape column %s for ingestion: %s", unescaped, PQerrorMessage(conn)); return ADBC_STATUS_INTERNAL; } create += escaped; *escaped_field_list += escaped; PQfreemem(escaped); PostgresType pg_type; struct ArrowError na_error; CHECK_NA_DETAIL(INTERNAL, PostgresType::FromSchema(*type_resolver_, source_schema.children[i], &pg_type, &na_error), &na_error, error); create += " " + pg_type.sql_type_name(); } if (ingest_.mode == IngestMode::kAppend) { return ADBC_STATUS_OK; } create += ")"; SetError(error, "%s%s", "[libpq] ", create.c_str()); PGresult* result = PQexecParams(conn, create.c_str(), /*nParams=*/0, /*paramTypes=*/nullptr, /*paramValues=*/nullptr, /*paramLengths=*/nullptr, /*paramFormats=*/nullptr, /*resultFormat=*/1 /*(binary)*/); if (PQresultStatus(result) != PGRES_COMMAND_OK) { AdbcStatusCode code = SetError(error, result, "[libpq] Failed to create table: %s\nQuery was: %s", PQerrorMessage(conn), create.c_str()); PQclear(result); return code; } PQclear(result); return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::ExecuteBind(struct ArrowArrayStream* stream, int64_t* rows_affected, struct AdbcError* error) { PqResultArrayReader reader(connection_->conn(), type_resolver_, query_); reader.SetAutocommit(connection_->autocommit()); reader.SetBind(&bind_); reader.SetVendorName(connection_->VendorName()); RAISE_STATUS(error, reader.ToArrayStream(rows_affected, stream)); return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream, int64_t* rows_affected, struct AdbcError* error) { ClearResult(); // Use a dedicated path to handle bulk ingest if (!ingest_.target.empty()) { return ExecuteIngest(stream, rows_affected, error); } if (query_.empty()) { SetError(error, "%s", "[libpq] Must SetSqlQuery before ExecuteQuery"); return ADBC_STATUS_INVALID_STATE; } // Use a dedicated path to handle parameter binding if (bind_.release != nullptr) { return ExecuteBind(stream, rows_affected, error); } // If we have been requested to avoid COPY or there is no output requested, // execute using the PqResultArrayReader. if (!stream || !UseCopy()) { PqResultArrayReader reader(connection_->conn(), type_resolver_, query_); reader.SetVendorName(connection_->VendorName()); RAISE_STATUS(error, reader.ToArrayStream(rows_affected, stream)); return ADBC_STATUS_OK; } PqResultHelper helper(connection_->conn(), query_); RAISE_STATUS(error, helper.Prepare()); RAISE_STATUS(error, helper.DescribePrepared()); // Initialize the copy reader and infer the output schema (i.e., error for // unsupported types before issuing the COPY query). This could be lazier // (i.e., executed on the first call to GetSchema() or GetNext()). PostgresType root_type; RAISE_STATUS(error, helper.ResolveOutputTypes(*type_resolver_, &root_type)); // If there will be no columns in the result, we can also avoid COPY if (root_type.n_children() == 0) { // Could/should move the helper into the reader instead of repreparing PqResultArrayReader reader(connection_->conn(), type_resolver_, query_); reader.SetVendorName(connection_->VendorName()); RAISE_STATUS(error, reader.ToArrayStream(rows_affected, stream)); return ADBC_STATUS_OK; } struct ArrowError na_error; reader_->copy_reader_ = std::make_unique<PostgresCopyStreamReader>(); CHECK_NA(INTERNAL, reader_->copy_reader_->Init(root_type), error); CHECK_NA_DETAIL(INTERNAL, reader_->copy_reader_->InferOutputSchema( std::string(connection_->VendorName()), &na_error), &na_error, error); CHECK_NA_DETAIL(INTERNAL, reader_->copy_reader_->InitFieldReaders(&na_error), &na_error, error); // Execute the COPY query RAISE_STATUS(error, helper.ExecuteCopy()); // We need the PQresult back for the reader reader_->result_ = helper.ReleaseResult(); // Export to stream reader_->ExportTo(stream); if (rows_affected) *rows_affected = -1; return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::ExecuteSchema(struct ArrowSchema* schema, struct AdbcError* error) { ClearResult(); if (query_.empty()) { SetError(error, "%s", "[libpq] Must SetSqlQuery before ExecuteQuery"); return ADBC_STATUS_INVALID_STATE; } PqResultHelper helper(connection_->conn(), query_); if (bind_.release) { nanoarrow::UniqueSchema param_schema; struct ArrowError na_error; ArrowErrorInit(&na_error); CHECK_NA_DETAIL(INTERNAL, ArrowArrayStreamGetSchema(&bind_, param_schema.get(), &na_error), &na_error, error); if (std::string(param_schema->format) != "+s") { SetError(error, "%s", "[libpq] Bind parameters must have type STRUCT"); return ADBC_STATUS_INVALID_STATE; } std::vector<Oid> param_oids(param_schema->n_children); for (int64_t i = 0; i < param_schema->n_children; i++) { PostgresType pg_type; CHECK_NA_DETAIL(INTERNAL, PostgresType::FromSchema(*type_resolver_, param_schema->children[i], &pg_type, &na_error), &na_error, error); param_oids[i] = pg_type.oid(); } RAISE_STATUS(error, helper.Prepare(param_oids)); } else { RAISE_STATUS(error, helper.Prepare()); } RAISE_STATUS(error, helper.DescribePrepared()); PostgresType output_type; RAISE_STATUS(error, helper.ResolveOutputTypes(*type_resolver_, &output_type)); nanoarrow::UniqueSchema tmp; ArrowSchemaInit(tmp.get()); CHECK_NA(INTERNAL, output_type.SetSchema(tmp.get(), std::string(connection_->VendorName())), error); tmp.move(schema); return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::ExecuteIngest(struct ArrowArrayStream* stream, int64_t* rows_affected, struct AdbcError* error) { if (!bind_.release) { SetError(error, "%s", "[libpq] Must Bind() before Execute() for bulk ingestion"); return ADBC_STATUS_INVALID_STATE; } if (stream != nullptr) { SetError(error, "%s", "[libpq] Bulk ingest with result set is not supported"); return ADBC_STATUS_NOT_IMPLEMENTED; } // Need the current schema to avoid being shadowed by temp tables // This is a little unfortunate; we need another DB roundtrip std::string current_schema; { PqResultHelper result_helper{connection_->conn(), "SELECT CURRENT_SCHEMA()"}; RAISE_STATUS(error, result_helper.Execute()); auto it = result_helper.begin(); if (it == result_helper.end()) { SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT CURRENT_SCHEMA()'"); return ADBC_STATUS_INTERNAL; } current_schema = (*it)[0].data; } BindStream bind_stream; bind_stream.SetBind(&bind_); std::memset(&bind_, 0, sizeof(bind_)); std::string escaped_table; std::string escaped_field_list; RAISE_STATUS(error, bind_stream.Begin([&]() -> Status { struct AdbcError tmp_error = ADBC_ERROR_INIT; AdbcStatusCode status_code = CreateBulkTable(current_schema, bind_stream.bind_schema.value, &escaped_table, &escaped_field_list, &tmp_error); return Status::FromAdbc(status_code, tmp_error); })); std::string query = "COPY "; query += escaped_table; query += " ("; query += escaped_field_list; query += ") FROM STDIN WITH (FORMAT binary)"; PGresult* result = PQexec(connection_->conn(), query.c_str()); if (PQresultStatus(result) != PGRES_COPY_IN) { AdbcStatusCode code = SetError(error, result, "[libpq] COPY query failed: %s\nQuery was:%s", PQerrorMessage(connection_->conn()), query.c_str()); PQclear(result); return code; } PQclear(result); RAISE_STATUS(error, bind_stream.ExecuteCopy(connection_->conn(), *connection_->type_resolver(), rows_affected)); return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, size_t* length, struct AdbcError* error) { std::string result; if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) { result = ingest_.target; } else if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) == 0) { result = ingest_.db_schema; } else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) { switch (ingest_.mode) { case IngestMode::kCreate: result = ADBC_INGEST_OPTION_MODE_CREATE; break; case IngestMode::kAppend: result = ADBC_INGEST_OPTION_MODE_APPEND; break; case IngestMode::kReplace: result = ADBC_INGEST_OPTION_MODE_REPLACE; break; case IngestMode::kCreateAppend: result = ADBC_INGEST_OPTION_MODE_CREATE_APPEND; break; } } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) { result = std::to_string(reader_->batch_size_hint_bytes_); } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_USE_COPY) == 0) { if (UseCopy()) { result = "true"; } else { result = "false"; } } else { SetError(error, "[libpq] Unknown statement option '%s'", key); return ADBC_STATUS_NOT_FOUND; } if (result.size() + 1 <= *length) { std::memcpy(value, result.data(), result.size() + 1); } *length = static_cast<int64_t>(result.size() + 1); return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::GetOptionBytes(const char* key, uint8_t* value, size_t* length, struct AdbcError* error) { SetError(error, "[libpq] Unknown statement option '%s'", key); return ADBC_STATUS_NOT_FOUND; } AdbcStatusCode PostgresStatement::GetOptionDouble(const char* key, double* value, struct AdbcError* error) { SetError(error, "[libpq] Unknown statement option '%s'", key); return ADBC_STATUS_NOT_FOUND; } AdbcStatusCode PostgresStatement::GetOptionInt(const char* key, int64_t* value, struct AdbcError* error) { std::string result; if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) { *value = reader_->batch_size_hint_bytes_; return ADBC_STATUS_OK; } SetError(error, "[libpq] Unknown statement option '%s'", key); return ADBC_STATUS_NOT_FOUND; } AdbcStatusCode PostgresStatement::GetParameterSchema(struct ArrowSchema* schema, struct AdbcError* error) { return ADBC_STATUS_NOT_IMPLEMENTED; } AdbcStatusCode PostgresStatement::Prepare(struct AdbcError* error) { if (query_.empty()) { SetError(error, "%s", "[libpq] Must SetSqlQuery() before Prepare()"); return ADBC_STATUS_INVALID_STATE; } // Don't actually prepare until execution time, so we know the // parameter types prepared_ = true; return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::Release(struct AdbcError* error) { ClearResult(); if (bind_.release) { bind_.release(&bind_); } return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::SetSqlQuery(const char* query, struct AdbcError* error) { ingest_.target.clear(); ingest_.db_schema.clear(); query_ = query; prepared_ = false; return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::SetOption(const char* key, const char* value, struct AdbcError* error) { if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) { query_.clear(); ingest_.target = value; prepared_ = false; } else if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) == 0) { query_.clear(); if (value == nullptr) { ingest_.db_schema.clear(); } else { ingest_.db_schema = value; } prepared_ = false; } else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) { if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) { ingest_.mode = IngestMode::kCreate; } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) { ingest_.mode = IngestMode::kAppend; } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_REPLACE) == 0) { ingest_.mode = IngestMode::kReplace; } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE_APPEND) == 0) { ingest_.mode = IngestMode::kCreateAppend; } else { SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, key); return ADBC_STATUS_INVALID_ARGUMENT; } prepared_ = false; } else if (std::strcmp(key, ADBC_INGEST_OPTION_TEMPORARY) == 0) { if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) { // https://github.com/apache/arrow-adbc/issues/1109: only clear the // schema if enabling since Python always sets the flag explicitly ingest_.temporary = true; ingest_.db_schema.clear(); } else if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) { ingest_.temporary = false; } else { SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, key); return ADBC_STATUS_INVALID_ARGUMENT; } prepared_ = false; } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) { int64_t int_value = std::atol(value); if (int_value <= 0) { SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, key); return ADBC_STATUS_INVALID_ARGUMENT; } this->batch_size_hint_bytes_ = this->reader_->batch_size_hint_bytes_ = int_value; } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_USE_COPY) == 0) { if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) { use_copy_ = true; } else if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) { use_copy_ = false; } else { SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, key); return ADBC_STATUS_INVALID_ARGUMENT; } } else { SetError(error, "[libpq] Unknown statement option '%s'", key); return ADBC_STATUS_NOT_IMPLEMENTED; } return ADBC_STATUS_OK; } AdbcStatusCode PostgresStatement::SetOptionBytes(const char* key, const uint8_t* value, size_t length, struct AdbcError* error) { SetError(error, "%s%s", "[libpq] Unknown statement option ", key); return ADBC_STATUS_NOT_IMPLEMENTED; } AdbcStatusCode PostgresStatement::SetOptionDouble(const char* key, double value, struct AdbcError* error) { SetError(error, "%s%s", "[libpq] Unknown statement option ", key); return ADBC_STATUS_NOT_IMPLEMENTED; } AdbcStatusCode PostgresStatement::SetOptionInt(const char* key, int64_t value, struct AdbcError* error) { if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) { if (value <= 0) { SetError(error, "[libpq] Invalid value '%" PRIi64 "' for option '%s'", value, key); return ADBC_STATUS_INVALID_ARGUMENT; } this->batch_size_hint_bytes_ = this->reader_->batch_size_hint_bytes_ = value; return ADBC_STATUS_OK; } SetError(error, "[libpq] Unknown statement option '%s'", key); return ADBC_STATUS_NOT_IMPLEMENTED; } void PostgresStatement::ClearResult() { // TODO: we may want to synchronize here for safety if (reader_) reader_->Release(); reader_ = std::make_shared<TupleReader>(connection_->conn()); reader_->batch_size_hint_bytes_ = batch_size_hint_bytes_; } int PostgresStatement::UseCopy() { if (use_copy_ == -1) { return connection_->VendorName() != "Redshift"; } else { return use_copy_; } } } // namespace adbcpq