void ProxyDestination::initializeTransport()

in mcrouter/ProxyDestination-inl.h [268:505]


void ProxyDestination<Transport>::initializeTransport() {
  assert(!transport_);

  ConnectionOptions options(accessPoint());
  auto& opts = proxy().router().opts();
  options.tcpKeepAliveCount = opts.keepalive_cnt;
  options.tcpKeepAliveIdle = opts.keepalive_idle_s;
  options.tcpKeepAliveInterval = opts.keepalive_interval_s;
  options.numConnectTimeoutRetries = opts.connect_timeout_retries;
  options.connectTimeout = shortestConnectTimeout();
  options.writeTimeout = shortestWriteTimeout();
  options.routerInfoName = proxy().router().routerInfoName();
  if (!opts.debug_fifo_root.empty()) {
    options.debugFifoPath = getClientDebugFifoFullPath(opts);
  }
  if (opts.enable_qos) {
    options.enableQoS = true;
    options.qosClass = qosClass();
    options.qosPath = qosPath();
  }
  options.useJemallocNodumpAllocator = opts.jemalloc_nodump_buffers;
  if (accessPoint()->compressed()) {
    if (auto codecManager = proxy().router().getCodecManager()) {
      options.compressionCodecMap = codecManager->getCodecMap();
      options.thriftCompression = true;
      options.thriftCompressionThreshold = opts.thrift_compression_threshold;
    }
  }

  if (accessPoint()->useSsl()) {
    options.securityOpts.sslPemCertPath = opts.pem_cert_path;
    options.securityOpts.sslPemKeyPath = opts.pem_key_path;
    if (opts.ssl_verify_peers) {
      options.securityOpts.sslPemCaPath = opts.pem_ca_path;
    }
    options.securityOpts.sessionCachingEnabled = opts.ssl_connection_cache;
    options.securityOpts.sslHandshakeOffload = opts.ssl_handshake_offload;
    options.securityOpts.sslServiceIdentity = opts.ssl_service_identity;
    options.securityOpts.sslAuthorizationEnforce =
        opts.ssl_service_identity_authorization_enforce;
    options.securityOpts.tfoEnabledForSsl = opts.enable_ssl_tfo;
    options.securityOpts.tlsPreferOcbCipher = opts.tls_prefer_ocb_cipher;
  }

  auto client = std::unique_ptr<Transport, typename Transport::Destructor>(
      new Transport(proxy().eventBase(), std::move(options)),
      typename Transport::Destructor());
  {
    std::unique_lock g(transportLock_);
    transport_ = std::move(client);
  }

  transport_->setFlushList(&proxy().flushList());

  transport_->setRequestStatusCallbacks(RequestStatusCallbacks{
      [this](int pending, int inflight) { // onStateChange
        if (pending != 0) {
          proxy().stats().increment(destination_pending_reqs_stat, pending);
          proxy().stats().setValue(
              destination_max_pending_reqs_stat,
              std::max(
                  proxy().stats().getValue(destination_max_pending_reqs_stat),
                  proxy().stats().getValue(destination_pending_reqs_stat)));
        }
        if (inflight != 0) {
          proxy().stats().increment(destination_inflight_reqs_stat, inflight);
          proxy().stats().setValue(
              destination_max_inflight_reqs_stat,
              std::max(
                  proxy().stats().getValue(destination_max_inflight_reqs_stat),
                  proxy().stats().getValue(destination_inflight_reqs_stat)));
        }
      },
      [this](size_t numToSend) { // onWrite
        proxy().stats().increment(num_socket_writes_stat);
        proxy().stats().increment(destination_batches_sum_stat);
        proxy().stats().increment(destination_requests_sum_stat, numToSend);
      },
      [this]() { // onPartialWrite
        proxy().stats().increment(num_socket_partial_writes_stat);
      }});

  transport_->setConnectionStatusCallbacks(ConnectionStatusCallbacks{
      [this](
          const folly::AsyncTransportWrapper& socket,
          int64_t numConnectRetries) mutable {
        setState(State::Up);
        proxy().stats().increment(num_connections_opened_stat);

        updatePoolStatConnections(true);
        auto mech = accessPoint()->getSecurityMech();
        if (mech == SecurityMech::TLS_TO_PLAINTEXT) {
          if (const auto* tlsToPlainSock =
                  socket.getUnderlyingTransport<TlsToPlainTransport>()) {
            auto stats = tlsToPlainSock->getStats();
            proxy().stats().increment(num_tls_to_plain_connections_opened_stat);
            if (stats.sessionReuseAttempted) {
              proxy().stats().increment(
                  num_tls_to_plain_resumption_attempts_stat);
            }
            if (stats.sessionReuseSuccess) {
              proxy().stats().increment(
                  num_tls_to_plain_resumption_successes_stat);
            }
          } else if (
              const auto* thriftTlsToPlainSock =
                  socket.getUnderlyingTransport<AsyncTlsToPlaintextSocket>()) {
            proxy().stats().increment(num_tls_to_plain_connections_opened_stat);

            using Status = AsyncTlsToPlaintextSocket::SessionResumptionStatus;
            switch (thriftTlsToPlainSock->getSessionResumptionStatus()) {
              case Status::RESUMPTION_NOT_ATTEMPTED:
                break;
              case Status::RESUMPTION_ATTEMPTED_AND_SUCCEEDED:
                proxy().stats().increment(
                    num_tls_to_plain_resumption_successes_stat);
                FOLLY_FALLTHROUGH;
              case Status::RESUMPTION_ATTEMPTED_AND_FAILED:
                proxy().stats().increment(
                    num_tls_to_plain_resumption_attempts_stat);
            };
          } else {
            proxy().stats().increment(num_tls_to_plain_fallback_failures_stat);
          }
        }
        if (mech == SecurityMech::KTLS12) {
          auto stats = McSSLUtil::getKtlsStats(socket);
          if (stats) {
            proxy().stats().increment(num_ktls_connections_opened_stat);
            if (stats->sessionReuseAttempted) {
              proxy().stats().increment(num_ktls_resumption_attempts_stat);
            }
            if (stats->sessionReuseSuccess) {
              proxy().stats().increment(num_ktls_resumption_successes_stat);
            }
          } else {
            proxy().stats().increment(num_ktls_fallback_failures_stat);
          }
        }
        // no else if here in case the tls to plain didn't work - we can capture
        // ssl socket stats here
        if (const auto* sslSocket =
                socket.getUnderlyingTransport<folly::AsyncSSLSocket>()) {
          proxy().stats().increment(num_ssl_connections_opened_stat);
          if (sslSocket->sessionResumptionAttempted()) {
            proxy().stats().increment(num_ssl_resumption_attempts_stat);
          }
          if (sslSocket->getSSLSessionReused()) {
            proxy().stats().increment(num_ssl_resumption_successes_stat);
          }
        } else if (
            const auto* fizzSock =
                socket.getUnderlyingTransport<McFizzClient>()) {
          proxy().stats().increment(num_ssl_connections_opened_stat);
          if (fizzSock->pskResumed()) {
            proxy().stats().increment(num_ssl_resumption_successes_stat);
            proxy().stats().increment(num_ssl_resumption_attempts_stat);
          } else {
            auto pskState = fizzSock->getState().pskType();
            if (pskState && pskState.value() == fizz::PskType::Rejected) {
              // session resumption was attempted, but failed
              proxy().stats().increment(num_ssl_resumption_attempts_stat);
            }
          }
        }

        if (numConnectRetries > 0) {
          proxy().stats().increment(num_connect_success_after_retrying_stat);
          proxy().stats().increment(
              num_connect_retries_stat, numConnectRetries);
        }

        updateConnectionClosedInternalStat();
      },
      [pdstnPtr = selfPtr_](
          ConnectionDownReason reason, int64_t numConnectRetries) {
        auto pdstn = pdstnPtr.lock();
        if (!pdstn) {
          LOG(WARNING) << "Proxy destination is already destroyed. "
                          "Stats will not be bumped.";
          return;
        }

        pdstn->proxy().stats().increment(num_connections_closed_stat);
        if (pdstn->accessPoint()->useSsl()) {
          auto mech = pdstn->accessPoint()->getSecurityMech();
          if (mech == SecurityMech::TLS_TO_PLAINTEXT) {
            pdstn->proxy().stats().increment(
                num_tls_to_plain_connections_closed_stat);
          } else if (mech == SecurityMech::KTLS12) {
            pdstn->proxy().stats().increment(num_ktls_connections_closed_stat);
          } else {
            pdstn->proxy().stats().increment(num_ssl_connections_closed_stat);
          }
        }

        pdstn->updatePoolStatConnections(false);

        if (reason == ConnectionDownReason::ABORTED) {
          pdstn->setState(State::Closed);
        } else {
          // In case of server going away, we should gracefully close the
          // connection (i.e. allow remaining outstanding requests to drain).
          if (reason == ConnectionDownReason::SERVER_GONE_AWAY) {
            pdstn->closeGracefully();
          }
          pdstn->setState(State::Down);
          pdstn->handleTko(
              reason == ConnectionDownReason::CONNECT_TIMEOUT
                  ? carbon::Result::CONNECT_TIMEOUT
                  : carbon::Result::CONNECT_ERROR,
              /* isProbeRequest= */ false);
        }

        pdstn->proxy().stats().increment(
            num_connect_retries_stat, numConnectRetries);
      }});

  transport_->setAuthorizationCallbacks(AuthorizationCallbacks{
      [this](
          const folly::AsyncTransportWrapper& socket,
          const ConnectionOptions& connectionOptions) mutable {
        if (auto& callback = proxy().router().svcIdentAuthCallbackFunc()) {
          if (!callback(socket, connectionOptions)) {
            proxy().stats().increment(num_authorization_failures_stat);
            return false;
          } else {
            proxy().stats().increment(num_authorization_successes_stat);
          }
        }
        return true;
      }});

  if (opts.target_max_inflight_requests > 0) {
    transport_->setThrottle(
        opts.target_max_inflight_requests, opts.target_max_pending_requests);
  }
}