in c/driver/postgresql/statement.cc [881:994]
AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream,
int64_t* rows_affected,
struct AdbcError* error) {
ClearResult();
if (prepared_) {
if (bind_.release || !stream) {
return ExecutePreparedStatement(stream, rows_affected, error);
}
// XXX: don't use a prepared statement to execute a no-parameter
// result-set-returning query for now, since we can't easily get
// access to COPY there. (This might have to become sequential
// executions of COPY (EXECUTE ($n, ...)) TO STDOUT which won't
// get all the benefits of a prepared statement.) At preparation
// time we don't know whether the query will be used with a result
// set or not without analyzing the query (we could prepare both?)
// and https://stackoverflow.com/questions/69233792 suggests that
// you can't PREPARE a query containing COPY.
}
if (!stream && !ingest_.target.empty()) {
return ExecuteUpdateBulk(rows_affected, error);
}
if (query_.empty()) {
SetError(error, "%s", "[libpq] Must SetSqlQuery before ExecuteQuery");
return ADBC_STATUS_INVALID_STATE;
}
// 1. Prepare the query to get the schema
{
// TODO: we should pipeline here and assume this will succeed
PGresult* result = PQprepare(connection_->conn(), /*stmtName=*/"", query_.c_str(),
/*nParams=*/0, nullptr);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
SetError(error,
"[libpq] Failed to execute query: could not infer schema: failed to "
"prepare query: %s\nQuery was:%s",
PQerrorMessage(connection_->conn()), query_.c_str());
PQclear(result);
return ADBC_STATUS_IO;
}
PQclear(result);
result = PQdescribePrepared(connection_->conn(), /*stmtName=*/"");
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
SetError(error,
"[libpq] Failed to execute query: could not infer schema: failed to "
"describe prepared statement: %s\nQuery was:%s",
PQerrorMessage(connection_->conn()), query_.c_str());
PQclear(result);
return ADBC_STATUS_IO;
}
// Resolve the information from the PGresult into a PostgresType
PostgresType root_type;
AdbcStatusCode status =
ResolvePostgresType(*type_resolver_, result, &root_type, error);
PQclear(result);
if (status != ADBC_STATUS_OK) return status;
// Initialize the copy reader and infer the output schema (i.e., error for
// unsupported types before issuing the COPY query)
reader_.copy_reader_.reset(new PostgresCopyStreamReader());
reader_.copy_reader_->Init(root_type);
struct ArrowError na_error;
int na_res = reader_.copy_reader_->InferOutputSchema(&na_error);
if (na_res != NANOARROW_OK) {
SetError(error, "[libpq] Failed to infer output schema: %s", na_error.message);
return na_res;
}
// If the caller did not request a result set or if there are no
// inferred output columns (e.g. a CREATE or UPDATE), then don't
// use COPY (which would fail anyways)
if (!stream || root_type.n_children() == 0) {
RAISE_ADBC(ExecuteUpdateQuery(rows_affected, error));
if (stream) {
struct ArrowSchema schema;
std::memset(&schema, 0, sizeof(schema));
RAISE_NA(reader_.copy_reader_->GetSchema(&schema));
nanoarrow::EmptyArrayStream::MakeUnique(&schema).move(stream);
}
return ADBC_STATUS_OK;
}
// This resolves the reader specific to each PostgresType -> ArrowSchema
// conversion. It is unlikely that this will fail given that we have just
// inferred these conversions ourselves.
na_res = reader_.copy_reader_->InitFieldReaders(&na_error);
if (na_res != NANOARROW_OK) {
SetError(error, "[libpq] Failed to initialize field readers: %s", na_error.message);
return na_res;
}
}
// 2. Execute the query with COPY to get binary tuples
{
std::string copy_query = "COPY (" + query_ + ") TO STDOUT (FORMAT binary)";
reader_.result_ =
PQexecParams(connection_->conn(), copy_query.c_str(), /*nParams=*/0,
/*paramTypes=*/nullptr, /*paramValues=*/nullptr,
/*paramLengths=*/nullptr, /*paramFormats=*/nullptr, kPgBinaryFormat);
if (PQresultStatus(reader_.result_) != PGRES_COPY_OUT) {
SetError(error,
"[libpq] Failed to execute query: could not begin COPY: %s\nQuery was: %s",
PQerrorMessage(connection_->conn()), copy_query.c_str());
ClearResult();
return ADBC_STATUS_IO;
}
// Result is read from the connection, not the result, but we won't clear it here
}
reader_.ExportTo(stream);
if (rows_affected) *rows_affected = -1;
return ADBC_STATUS_OK;
}