AdbcStatusCode PostgresStatement::ExecuteQuery()

in c/driver/postgresql/statement.cc [881:994]


AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream,
                                               int64_t* rows_affected,
                                               struct AdbcError* error) {
  ClearResult();
  if (prepared_) {
    if (bind_.release || !stream) {
      return ExecutePreparedStatement(stream, rows_affected, error);
    }
    // XXX: don't use a prepared statement to execute a no-parameter
    // result-set-returning query for now, since we can't easily get
    // access to COPY there. (This might have to become sequential
    // executions of COPY (EXECUTE ($n, ...)) TO STDOUT which won't
    // get all the benefits of a prepared statement.) At preparation
    // time we don't know whether the query will be used with a result
    // set or not without analyzing the query (we could prepare both?)
    // and https://stackoverflow.com/questions/69233792 suggests that
    // you can't PREPARE a query containing COPY.
  }
  if (!stream && !ingest_.target.empty()) {
    return ExecuteUpdateBulk(rows_affected, error);
  }

  if (query_.empty()) {
    SetError(error, "%s", "[libpq] Must SetSqlQuery before ExecuteQuery");
    return ADBC_STATUS_INVALID_STATE;
  }

  // 1. Prepare the query to get the schema
  {
    // TODO: we should pipeline here and assume this will succeed
    PGresult* result = PQprepare(connection_->conn(), /*stmtName=*/"", query_.c_str(),
                                 /*nParams=*/0, nullptr);
    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
      SetError(error,
               "[libpq] Failed to execute query: could not infer schema: failed to "
               "prepare query: %s\nQuery was:%s",
               PQerrorMessage(connection_->conn()), query_.c_str());
      PQclear(result);
      return ADBC_STATUS_IO;
    }
    PQclear(result);
    result = PQdescribePrepared(connection_->conn(), /*stmtName=*/"");
    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
      SetError(error,
               "[libpq] Failed to execute query: could not infer schema: failed to "
               "describe prepared statement: %s\nQuery was:%s",
               PQerrorMessage(connection_->conn()), query_.c_str());
      PQclear(result);
      return ADBC_STATUS_IO;
    }

    // Resolve the information from the PGresult into a PostgresType
    PostgresType root_type;
    AdbcStatusCode status =
        ResolvePostgresType(*type_resolver_, result, &root_type, error);
    PQclear(result);
    if (status != ADBC_STATUS_OK) return status;

    // Initialize the copy reader and infer the output schema (i.e., error for
    // unsupported types before issuing the COPY query)
    reader_.copy_reader_.reset(new PostgresCopyStreamReader());
    reader_.copy_reader_->Init(root_type);
    struct ArrowError na_error;
    int na_res = reader_.copy_reader_->InferOutputSchema(&na_error);
    if (na_res != NANOARROW_OK) {
      SetError(error, "[libpq] Failed to infer output schema: %s", na_error.message);
      return na_res;
    }

    // If the caller did not request a result set or if there are no
    // inferred output columns (e.g. a CREATE or UPDATE), then don't
    // use COPY (which would fail anyways)
    if (!stream || root_type.n_children() == 0) {
      RAISE_ADBC(ExecuteUpdateQuery(rows_affected, error));
      if (stream) {
        struct ArrowSchema schema;
        std::memset(&schema, 0, sizeof(schema));
        RAISE_NA(reader_.copy_reader_->GetSchema(&schema));
        nanoarrow::EmptyArrayStream::MakeUnique(&schema).move(stream);
      }
      return ADBC_STATUS_OK;
    }

    // This resolves the reader specific to each PostgresType -> ArrowSchema
    // conversion. It is unlikely that this will fail given that we have just
    // inferred these conversions ourselves.
    na_res = reader_.copy_reader_->InitFieldReaders(&na_error);
    if (na_res != NANOARROW_OK) {
      SetError(error, "[libpq] Failed to initialize field readers: %s", na_error.message);
      return na_res;
    }
  }

  // 2. Execute the query with COPY to get binary tuples
  {
    std::string copy_query = "COPY (" + query_ + ") TO STDOUT (FORMAT binary)";
    reader_.result_ =
        PQexecParams(connection_->conn(), copy_query.c_str(), /*nParams=*/0,
                     /*paramTypes=*/nullptr, /*paramValues=*/nullptr,
                     /*paramLengths=*/nullptr, /*paramFormats=*/nullptr, kPgBinaryFormat);
    if (PQresultStatus(reader_.result_) != PGRES_COPY_OUT) {
      SetError(error,
               "[libpq] Failed to execute query: could not begin COPY: %s\nQuery was: %s",
               PQerrorMessage(connection_->conn()), copy_query.c_str());
      ClearResult();
      return ADBC_STATUS_IO;
    }
    // Result is read from the connection, not the result, but we won't clear it here
  }

  reader_.ExportTo(stream);
  if (rows_affected) *rows_affected = -1;
  return ADBC_STATUS_OK;
}