in cpp/src/arrow/flight/transport/grpc/grpc_client.cc [686:839]
Status Init(const FlightClientOptions& options, const Location& location,
const arrow::util::Uri& uri) override {
const std::string& scheme = location.scheme();
std::stringstream grpc_uri;
std::shared_ptr<::grpc::ChannelCredentials> creds;
if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp || scheme == kSchemeGrpcTls) {
grpc_uri << arrow::util::UriEncodeHost(uri.host()) << ':' << uri.port_text();
if (scheme == kSchemeGrpcTls) {
if (options.disable_server_verification) {
#if defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS)
namespace ge = ::GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS;
# if defined(GRPC_USE_CERTIFICATE_VERIFIER)
// gRPC >= 1.43
class NoOpCertificateVerifier : public ge::ExternalCertificateVerifier {
public:
bool Verify(ge::TlsCustomVerificationCheckRequest*,
std::function<void(::grpc::Status)>,
::grpc::Status* sync_status) override {
*sync_status = ::grpc::Status::OK;
return true; // Check done synchronously
}
void Cancel(ge::TlsCustomVerificationCheckRequest*) override {}
};
auto cert_verifier =
ge::ExternalCertificateVerifier::Create<NoOpCertificateVerifier>();
# else // defined(GRPC_USE_CERTIFICATE_VERIFIER)
// gRPC < 1.43
// A callback to supply to TlsCredentialsOptions that accepts any server
// arguments.
struct NoOpTlsAuthorizationCheck
: public ge::TlsServerAuthorizationCheckInterface {
int Schedule(ge::TlsServerAuthorizationCheckArg* arg) override {
arg->set_success(1);
arg->set_status(GRPC_STATUS_OK);
return 0;
}
};
auto server_authorization_check = std::make_shared<NoOpTlsAuthorizationCheck>();
noop_auth_check_ = std::make_shared<ge::TlsServerAuthorizationCheckConfig>(
server_authorization_check);
# endif // defined(GRPC_USE_CERTIFICATE_VERIFIER)
# if defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS)
auto certificate_provider =
std::make_shared<::grpc::experimental::StaticDataCertificateProvider>(
kDummyRootCert);
# if defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS_ROOT_CERTS)
::grpc::experimental::TlsChannelCredentialsOptions tls_options(
certificate_provider);
# else // defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS_ROOT_CERTS)
// While gRPC >= 1.36 does not require a root cert (it has a default)
// in practice the path it hardcodes is broken. See grpc/grpc#21655.
::grpc::experimental::TlsChannelCredentialsOptions tls_options;
tls_options.set_certificate_provider(certificate_provider);
# endif // defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS_ROOT_CERTS)
tls_options.watch_root_certs();
tls_options.set_root_cert_name("dummy");
# if defined(GRPC_USE_CERTIFICATE_VERIFIER)
tls_options.set_certificate_verifier(std::move(cert_verifier));
tls_options.set_check_call_host(false);
tls_options.set_verify_server_certs(false);
# else // defined(GRPC_USE_CERTIFICATE_VERIFIER)
tls_options.set_server_verification_option(
grpc_tls_server_verification_option::GRPC_TLS_SKIP_ALL_SERVER_VERIFICATION);
tls_options.set_server_authorization_check_config(noop_auth_check_);
# endif // defined(GRPC_USE_CERTIFICATE_VERIFIER)
# elif defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS)
// continues defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS)
auto materials_config = std::make_shared<ge::TlsKeyMaterialsConfig>();
materials_config->set_pem_root_certs(kDummyRootCert);
ge::TlsCredentialsOptions tls_options(
GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE,
GRPC_TLS_SKIP_ALL_SERVER_VERIFICATION, materials_config,
std::shared_ptr<ge::TlsCredentialReloadConfig>(), noop_auth_check_);
# endif // defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS)
creds = ge::TlsCredentials(tls_options);
#else // defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS)
return Status::NotImplemented(
"Using encryption with server verification disabled is unsupported. "
"Please use a release of Arrow Flight built with gRPC 1.27 or higher.");
#endif // defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS)
} else {
::grpc::SslCredentialsOptions ssl_options;
if (!options.tls_root_certs.empty()) {
ssl_options.pem_root_certs = options.tls_root_certs;
}
if (!options.cert_chain.empty()) {
ssl_options.pem_cert_chain = options.cert_chain;
}
if (!options.private_key.empty()) {
ssl_options.pem_private_key = options.private_key;
}
creds = ::grpc::SslCredentials(ssl_options);
}
} else {
creds = ::grpc::InsecureChannelCredentials();
}
} else if (scheme == kSchemeGrpcUnix) {
grpc_uri << "unix://" << uri.path();
creds = ::grpc::InsecureChannelCredentials();
} else {
return Status::NotImplemented("Flight scheme ", scheme,
" is not supported by the gRPC transport");
}
::grpc::ChannelArguments args;
// We can't set the same config value twice, so for values where
// we want to set defaults, keep them in a map and update them;
// then update them all at once
std::unordered_map<std::string, int> default_args;
// Try to reconnect quickly at first, in case the server is still starting up
default_args[GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS] = 100;
// Receive messages of any size
default_args[GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH] = -1;
// Setting this arg enables each client to open it's own TCP connection to server,
// not sharing one single connection, which becomes bottleneck under high load.
default_args[GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL] = 1;
if (options.override_hostname != "") {
args.SetSslTargetNameOverride(options.override_hostname);
}
// Allow setting generic gRPC options.
for (const auto& arg : options.generic_options) {
if (std::holds_alternative<int>(arg.second)) {
default_args[arg.first] = std::get<int>(arg.second);
} else if (std::holds_alternative<std::string>(arg.second)) {
args.SetString(arg.first, std::get<std::string>(arg.second));
}
// Otherwise unimplemented
}
for (const auto& pair : default_args) {
args.SetInt(pair.first, pair.second);
}
std::vector<std::unique_ptr<::grpc::experimental::ClientInterceptorFactoryInterface>>
interceptors;
interceptors.emplace_back(
new GrpcClientInterceptorAdapterFactory(std::move(options.middleware)));
stub_ = pb::FlightService::NewStub(
::grpc::experimental::CreateCustomChannelWithInterceptors(
grpc_uri.str(), creds, args, std::move(interceptors)));
#ifdef GRPC_ENABLE_ASYNC
garbage_bin_ = std::make_shared<GrpcGarbageBin>();
#endif
return Status::OK();
}