example/flight-sql/query-prepared.cc (141 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdlib>
#include <fstream>
#include <iostream>
#include <arrow/builder.h>
#include <arrow/flight/sql/client.h>
#include <arrow/table_builder.h>
namespace {
std::string
getenv(const char* name)
{
auto value = std::getenv(name);
if (value)
{
return std::string(value);
}
else
{
return std::string("");
}
}
arrow::Result<std::unique_ptr<arrow::flight::sql::FlightSqlClient>>
connect(arrow::flight::FlightCallOptions& call_options)
{
auto uri = getenv("PGFLIGHTSQLURI");
if (uri.empty())
{
auto host = getenv("PGHOST");
if (host.empty())
{
host = "localhost";
}
auto sslmode = getenv("PGSSLMODE");
if (sslmode == "require" || sslmode == "verify-ca" || sslmode == "verify-full")
{
uri = std::string("grpc+tls://") + host + ":15432";
}
else
{
uri = std::string("grpc://") + host + ":15432";
}
}
ARROW_ASSIGN_OR_RAISE(auto location, arrow::flight::Location::Parse(uri));
arrow::flight::FlightClientOptions client_options;
auto sslrootcert = getenv("PGSSLROOTCERT");
if (sslrootcert.empty())
{
auto home = getenv("HOME");
if (!home.empty())
{
sslrootcert = home + "/.postgresql/root.crt";
}
}
if (!sslrootcert.empty())
{
std::ifstream input(sslrootcert);
if (input)
{
client_options.tls_root_certs =
std::string(std::istreambuf_iterator<char>{input}, {});
}
}
ARROW_ASSIGN_OR_RAISE(auto client,
arrow::flight::FlightClient::Connect(location, client_options));
auto user = getenv("PGUSER");
if (user.empty())
{
user = getenv("USER");
}
auto password = getenv("PGPASSWORD");
if (password.empty())
{
password = "";
}
auto database = getenv("PGDATABASE");
if (database.empty())
{
database = user;
}
call_options.headers.emplace_back("x-flight-sql-database", database);
ARROW_ASSIGN_OR_RAISE(auto bearer_token,
client->AuthenticateBasicToken(call_options, user, password));
const auto& bearer_name = bearer_token.first;
const auto& bearer_value = bearer_token.second;
if (!bearer_name.empty() && !bearer_value.empty())
{
call_options.headers.emplace_back(bearer_name, bearer_value);
}
return std::make_unique<arrow::flight::sql::FlightSqlClient>(std::move(client));
}
// Start query
arrow::Status
run()
{
arrow::flight::FlightCallOptions call_options;
ARROW_ASSIGN_OR_RAISE(auto sql_client, connect(call_options));
ARROW_ASSIGN_OR_RAISE(auto statement,
sql_client->Prepare(call_options,
"SELECT i "
" FROM generate_series(1, 100) "
" AS series (i) "
" WHERE i < $1"));
auto schema = arrow::schema({arrow::field("i", arrow::int32())});
ARROW_ASSIGN_OR_RAISE(
auto record_batch_builder,
arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
auto i_builder = record_batch_builder->GetFieldAs<arrow::Int32Builder>(0);
ARROW_RETURN_NOT_OK(i_builder->Append(10));
ARROW_ASSIGN_OR_RAISE(auto record_batch, record_batch_builder->Flush());
ARROW_RETURN_NOT_OK(statement->SetParameters(record_batch));
ARROW_ASSIGN_OR_RAISE(auto info, statement->Execute(call_options));
for (const auto& endpoint : info->endpoints())
{
ARROW_ASSIGN_OR_RAISE(auto reader,
sql_client->DoGet(call_options, endpoint.ticket));
while (true)
{
ARROW_ASSIGN_OR_RAISE(auto chunk, reader->Next());
if (!chunk.data)
{
break;
}
std::cout << chunk.data->ToString() << std::endl;
}
}
ARROW_RETURN_NOT_OK(statement->Close(call_options));
return sql_client->Close();
}
// End query
}; // namespace
int
main(int argc, char** argv)
{
auto status = run();
if (status.ok())
{
return EXIT_SUCCESS;
}
else
{
std::cerr << status.ToString() << std::endl;
return EXIT_FAILURE;
}
}