in mcrouter/ProxyDestination-inl.h [268:505]
void ProxyDestination<Transport>::initializeTransport() {
assert(!transport_);
ConnectionOptions options(accessPoint());
auto& opts = proxy().router().opts();
options.tcpKeepAliveCount = opts.keepalive_cnt;
options.tcpKeepAliveIdle = opts.keepalive_idle_s;
options.tcpKeepAliveInterval = opts.keepalive_interval_s;
options.numConnectTimeoutRetries = opts.connect_timeout_retries;
options.connectTimeout = shortestConnectTimeout();
options.writeTimeout = shortestWriteTimeout();
options.routerInfoName = proxy().router().routerInfoName();
if (!opts.debug_fifo_root.empty()) {
options.debugFifoPath = getClientDebugFifoFullPath(opts);
}
if (opts.enable_qos) {
options.enableQoS = true;
options.qosClass = qosClass();
options.qosPath = qosPath();
}
options.useJemallocNodumpAllocator = opts.jemalloc_nodump_buffers;
if (accessPoint()->compressed()) {
if (auto codecManager = proxy().router().getCodecManager()) {
options.compressionCodecMap = codecManager->getCodecMap();
options.thriftCompression = true;
options.thriftCompressionThreshold = opts.thrift_compression_threshold;
}
}
if (accessPoint()->useSsl()) {
options.securityOpts.sslPemCertPath = opts.pem_cert_path;
options.securityOpts.sslPemKeyPath = opts.pem_key_path;
if (opts.ssl_verify_peers) {
options.securityOpts.sslPemCaPath = opts.pem_ca_path;
}
options.securityOpts.sessionCachingEnabled = opts.ssl_connection_cache;
options.securityOpts.sslHandshakeOffload = opts.ssl_handshake_offload;
options.securityOpts.sslServiceIdentity = opts.ssl_service_identity;
options.securityOpts.sslAuthorizationEnforce =
opts.ssl_service_identity_authorization_enforce;
options.securityOpts.tfoEnabledForSsl = opts.enable_ssl_tfo;
options.securityOpts.tlsPreferOcbCipher = opts.tls_prefer_ocb_cipher;
}
auto client = std::unique_ptr<Transport, typename Transport::Destructor>(
new Transport(proxy().eventBase(), std::move(options)),
typename Transport::Destructor());
{
std::unique_lock g(transportLock_);
transport_ = std::move(client);
}
transport_->setFlushList(&proxy().flushList());
transport_->setRequestStatusCallbacks(RequestStatusCallbacks{
[this](int pending, int inflight) { // onStateChange
if (pending != 0) {
proxy().stats().increment(destination_pending_reqs_stat, pending);
proxy().stats().setValue(
destination_max_pending_reqs_stat,
std::max(
proxy().stats().getValue(destination_max_pending_reqs_stat),
proxy().stats().getValue(destination_pending_reqs_stat)));
}
if (inflight != 0) {
proxy().stats().increment(destination_inflight_reqs_stat, inflight);
proxy().stats().setValue(
destination_max_inflight_reqs_stat,
std::max(
proxy().stats().getValue(destination_max_inflight_reqs_stat),
proxy().stats().getValue(destination_inflight_reqs_stat)));
}
},
[this](size_t numToSend) { // onWrite
proxy().stats().increment(num_socket_writes_stat);
proxy().stats().increment(destination_batches_sum_stat);
proxy().stats().increment(destination_requests_sum_stat, numToSend);
},
[this]() { // onPartialWrite
proxy().stats().increment(num_socket_partial_writes_stat);
}});
transport_->setConnectionStatusCallbacks(ConnectionStatusCallbacks{
[this](
const folly::AsyncTransportWrapper& socket,
int64_t numConnectRetries) mutable {
setState(State::Up);
proxy().stats().increment(num_connections_opened_stat);
updatePoolStatConnections(true);
auto mech = accessPoint()->getSecurityMech();
if (mech == SecurityMech::TLS_TO_PLAINTEXT) {
if (const auto* tlsToPlainSock =
socket.getUnderlyingTransport<TlsToPlainTransport>()) {
auto stats = tlsToPlainSock->getStats();
proxy().stats().increment(num_tls_to_plain_connections_opened_stat);
if (stats.sessionReuseAttempted) {
proxy().stats().increment(
num_tls_to_plain_resumption_attempts_stat);
}
if (stats.sessionReuseSuccess) {
proxy().stats().increment(
num_tls_to_plain_resumption_successes_stat);
}
} else if (
const auto* thriftTlsToPlainSock =
socket.getUnderlyingTransport<AsyncTlsToPlaintextSocket>()) {
proxy().stats().increment(num_tls_to_plain_connections_opened_stat);
using Status = AsyncTlsToPlaintextSocket::SessionResumptionStatus;
switch (thriftTlsToPlainSock->getSessionResumptionStatus()) {
case Status::RESUMPTION_NOT_ATTEMPTED:
break;
case Status::RESUMPTION_ATTEMPTED_AND_SUCCEEDED:
proxy().stats().increment(
num_tls_to_plain_resumption_successes_stat);
FOLLY_FALLTHROUGH;
case Status::RESUMPTION_ATTEMPTED_AND_FAILED:
proxy().stats().increment(
num_tls_to_plain_resumption_attempts_stat);
};
} else {
proxy().stats().increment(num_tls_to_plain_fallback_failures_stat);
}
}
if (mech == SecurityMech::KTLS12) {
auto stats = McSSLUtil::getKtlsStats(socket);
if (stats) {
proxy().stats().increment(num_ktls_connections_opened_stat);
if (stats->sessionReuseAttempted) {
proxy().stats().increment(num_ktls_resumption_attempts_stat);
}
if (stats->sessionReuseSuccess) {
proxy().stats().increment(num_ktls_resumption_successes_stat);
}
} else {
proxy().stats().increment(num_ktls_fallback_failures_stat);
}
}
// no else if here in case the tls to plain didn't work - we can capture
// ssl socket stats here
if (const auto* sslSocket =
socket.getUnderlyingTransport<folly::AsyncSSLSocket>()) {
proxy().stats().increment(num_ssl_connections_opened_stat);
if (sslSocket->sessionResumptionAttempted()) {
proxy().stats().increment(num_ssl_resumption_attempts_stat);
}
if (sslSocket->getSSLSessionReused()) {
proxy().stats().increment(num_ssl_resumption_successes_stat);
}
} else if (
const auto* fizzSock =
socket.getUnderlyingTransport<McFizzClient>()) {
proxy().stats().increment(num_ssl_connections_opened_stat);
if (fizzSock->pskResumed()) {
proxy().stats().increment(num_ssl_resumption_successes_stat);
proxy().stats().increment(num_ssl_resumption_attempts_stat);
} else {
auto pskState = fizzSock->getState().pskType();
if (pskState && pskState.value() == fizz::PskType::Rejected) {
// session resumption was attempted, but failed
proxy().stats().increment(num_ssl_resumption_attempts_stat);
}
}
}
if (numConnectRetries > 0) {
proxy().stats().increment(num_connect_success_after_retrying_stat);
proxy().stats().increment(
num_connect_retries_stat, numConnectRetries);
}
updateConnectionClosedInternalStat();
},
[pdstnPtr = selfPtr_](
ConnectionDownReason reason, int64_t numConnectRetries) {
auto pdstn = pdstnPtr.lock();
if (!pdstn) {
LOG(WARNING) << "Proxy destination is already destroyed. "
"Stats will not be bumped.";
return;
}
pdstn->proxy().stats().increment(num_connections_closed_stat);
if (pdstn->accessPoint()->useSsl()) {
auto mech = pdstn->accessPoint()->getSecurityMech();
if (mech == SecurityMech::TLS_TO_PLAINTEXT) {
pdstn->proxy().stats().increment(
num_tls_to_plain_connections_closed_stat);
} else if (mech == SecurityMech::KTLS12) {
pdstn->proxy().stats().increment(num_ktls_connections_closed_stat);
} else {
pdstn->proxy().stats().increment(num_ssl_connections_closed_stat);
}
}
pdstn->updatePoolStatConnections(false);
if (reason == ConnectionDownReason::ABORTED) {
pdstn->setState(State::Closed);
} else {
// In case of server going away, we should gracefully close the
// connection (i.e. allow remaining outstanding requests to drain).
if (reason == ConnectionDownReason::SERVER_GONE_AWAY) {
pdstn->closeGracefully();
}
pdstn->setState(State::Down);
pdstn->handleTko(
reason == ConnectionDownReason::CONNECT_TIMEOUT
? carbon::Result::CONNECT_TIMEOUT
: carbon::Result::CONNECT_ERROR,
/* isProbeRequest= */ false);
}
pdstn->proxy().stats().increment(
num_connect_retries_stat, numConnectRetries);
}});
transport_->setAuthorizationCallbacks(AuthorizationCallbacks{
[this](
const folly::AsyncTransportWrapper& socket,
const ConnectionOptions& connectionOptions) mutable {
if (auto& callback = proxy().router().svcIdentAuthCallbackFunc()) {
if (!callback(socket, connectionOptions)) {
proxy().stats().increment(num_authorization_failures_stat);
return false;
} else {
proxy().stats().increment(num_authorization_successes_stat);
}
}
return true;
}});
if (opts.target_max_inflight_requests > 0) {
transport_->setThrottle(
opts.target_max_inflight_requests, opts.target_max_pending_requests);
}
}