Status Init()

in cpp/src/arrow/flight/transport/grpc/grpc_client.cc [686:839]


  Status Init(const FlightClientOptions& options, const Location& location,
              const arrow::util::Uri& uri) override {
    const std::string& scheme = location.scheme();

    std::stringstream grpc_uri;
    std::shared_ptr<::grpc::ChannelCredentials> creds;
    if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp || scheme == kSchemeGrpcTls) {
      grpc_uri << arrow::util::UriEncodeHost(uri.host()) << ':' << uri.port_text();

      if (scheme == kSchemeGrpcTls) {
        if (options.disable_server_verification) {
#if defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS)
          namespace ge = ::GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS;

#  if defined(GRPC_USE_CERTIFICATE_VERIFIER)
          // gRPC >= 1.43
          class NoOpCertificateVerifier : public ge::ExternalCertificateVerifier {
           public:
            bool Verify(ge::TlsCustomVerificationCheckRequest*,
                        std::function<void(::grpc::Status)>,
                        ::grpc::Status* sync_status) override {
              *sync_status = ::grpc::Status::OK;
              return true;  // Check done synchronously
            }
            void Cancel(ge::TlsCustomVerificationCheckRequest*) override {}
          };
          auto cert_verifier =
              ge::ExternalCertificateVerifier::Create<NoOpCertificateVerifier>();

#  else   // defined(GRPC_USE_CERTIFICATE_VERIFIER)
          // gRPC < 1.43
          // A callback to supply to TlsCredentialsOptions that accepts any server
          // arguments.
          struct NoOpTlsAuthorizationCheck
              : public ge::TlsServerAuthorizationCheckInterface {
            int Schedule(ge::TlsServerAuthorizationCheckArg* arg) override {
              arg->set_success(1);
              arg->set_status(GRPC_STATUS_OK);
              return 0;
            }
          };
          auto server_authorization_check = std::make_shared<NoOpTlsAuthorizationCheck>();
          noop_auth_check_ = std::make_shared<ge::TlsServerAuthorizationCheckConfig>(
              server_authorization_check);
#  endif  // defined(GRPC_USE_CERTIFICATE_VERIFIER)

#  if defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS)
          auto certificate_provider =
              std::make_shared<::grpc::experimental::StaticDataCertificateProvider>(
                  kDummyRootCert);
#    if defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS_ROOT_CERTS)
          ::grpc::experimental::TlsChannelCredentialsOptions tls_options(
              certificate_provider);
#    else   // defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS_ROOT_CERTS)
            // While gRPC >= 1.36 does not require a root cert (it has a default)
            // in practice the path it hardcodes is broken. See grpc/grpc#21655.
          ::grpc::experimental::TlsChannelCredentialsOptions tls_options;
          tls_options.set_certificate_provider(certificate_provider);
#    endif  // defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS_ROOT_CERTS)
          tls_options.watch_root_certs();
          tls_options.set_root_cert_name("dummy");
#    if defined(GRPC_USE_CERTIFICATE_VERIFIER)
          tls_options.set_certificate_verifier(std::move(cert_verifier));
          tls_options.set_check_call_host(false);
          tls_options.set_verify_server_certs(false);
#    else   // defined(GRPC_USE_CERTIFICATE_VERIFIER)
          tls_options.set_server_verification_option(
              grpc_tls_server_verification_option::GRPC_TLS_SKIP_ALL_SERVER_VERIFICATION);
          tls_options.set_server_authorization_check_config(noop_auth_check_);
#    endif  // defined(GRPC_USE_CERTIFICATE_VERIFIER)
#  elif defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS)
          // continues defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS)
          auto materials_config = std::make_shared<ge::TlsKeyMaterialsConfig>();
          materials_config->set_pem_root_certs(kDummyRootCert);
          ge::TlsCredentialsOptions tls_options(
              GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE,
              GRPC_TLS_SKIP_ALL_SERVER_VERIFICATION, materials_config,
              std::shared_ptr<ge::TlsCredentialReloadConfig>(), noop_auth_check_);
#  endif  // defined(GRPC_USE_TLS_CHANNEL_CREDENTIALS_OPTIONS)
          creds = ge::TlsCredentials(tls_options);
#else   // defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS)
          return Status::NotImplemented(
              "Using encryption with server verification disabled is unsupported. "
              "Please use a release of Arrow Flight built with gRPC 1.27 or higher.");
#endif  // defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS)
        } else {
          ::grpc::SslCredentialsOptions ssl_options;
          if (!options.tls_root_certs.empty()) {
            ssl_options.pem_root_certs = options.tls_root_certs;
          }
          if (!options.cert_chain.empty()) {
            ssl_options.pem_cert_chain = options.cert_chain;
          }
          if (!options.private_key.empty()) {
            ssl_options.pem_private_key = options.private_key;
          }
          creds = ::grpc::SslCredentials(ssl_options);
        }
      } else {
        creds = ::grpc::InsecureChannelCredentials();
      }
    } else if (scheme == kSchemeGrpcUnix) {
      grpc_uri << "unix://" << uri.path();
      creds = ::grpc::InsecureChannelCredentials();
    } else {
      return Status::NotImplemented("Flight scheme ", scheme,
                                    " is not supported by the gRPC transport");
    }

    ::grpc::ChannelArguments args;
    // We can't set the same config value twice, so for values where
    // we want to set defaults, keep them in a map and update them;
    // then update them all at once
    std::unordered_map<std::string, int> default_args;
    // Try to reconnect quickly at first, in case the server is still starting up
    default_args[GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS] = 100;
    // Receive messages of any size
    default_args[GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH] = -1;
    // Setting this arg enables each client to open it's own TCP connection to server,
    // not sharing one single connection, which becomes bottleneck under high load.
    default_args[GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL] = 1;

    if (options.override_hostname != "") {
      args.SetSslTargetNameOverride(options.override_hostname);
    }

    // Allow setting generic gRPC options.
    for (const auto& arg : options.generic_options) {
      if (std::holds_alternative<int>(arg.second)) {
        default_args[arg.first] = std::get<int>(arg.second);
      } else if (std::holds_alternative<std::string>(arg.second)) {
        args.SetString(arg.first, std::get<std::string>(arg.second));
      }
      // Otherwise unimplemented
    }
    for (const auto& pair : default_args) {
      args.SetInt(pair.first, pair.second);
    }

    std::vector<std::unique_ptr<::grpc::experimental::ClientInterceptorFactoryInterface>>
        interceptors;
    interceptors.emplace_back(
        new GrpcClientInterceptorAdapterFactory(std::move(options.middleware)));

    stub_ = pb::FlightService::NewStub(
        ::grpc::experimental::CreateCustomChannelWithInterceptors(
            grpc_uri.str(), creds, args, std::move(interceptors)));

#ifdef GRPC_ENABLE_ASYNC
    garbage_bin_ = std::make_shared<GrpcGarbageBin>();
#endif

    return Status::OK();
  }