in mcrouter/lib/network/AsyncMcClientImpl.cpp [431:535]
void AsyncMcClientImpl::connectSuccess() noexcept {
assert(connectionState_ == ConnectionState::Connecting);
DestructorGuard dg(this);
connectionState_ = ConnectionState::Up;
const auto mech = connectionOptions_.accessPoint->getSecurityMech();
if (isAsyncSSLSocketMech(mech)) {
auto* sslSocket = socket_->getUnderlyingTransport<folly::AsyncSSLSocket>();
assert(sslSocket != nullptr);
if (mech == SecurityMech::TLS_TO_PLAINTEXT) {
// if we negotiated the right alpn, then server also supports this and we
// can fall back to plaintext.
auto fallback = McSSLUtil::moveToPlaintext(*sslSocket);
if (!fallback) {
// configured to use this but failed to negotiate. will end up doing
// ssl anyway
auto errorMessage = folly::to<std::string>(
"Failed to negotiate TLS to plaintext. SSL will be used for ",
connectionOptions_.accessPoint->toHostPortString());
LOG_FAILURE(
"AsyncMcClient", failure::Category::kBadEnvironment, errorMessage);
} else {
// replace socket with this
socket_.reset(fallback.release());
// we need to set timeout options here. things like tcp opts and
// qos are sock opts and don't change on the FD.
socket_->setSendTimeout(connectionOptions_.writeTimeout.count());
}
} else if (mech == SecurityMech::KTLS12) {
auto ktlsSock = McSSLUtil::moveToKtls(*sslSocket);
// it's not an error if this fails - it could be a kernel that doesn't
// support this or something else - on error we'll just continue using
// user space ssl
if (ktlsSock) {
socket_.reset(ktlsSock.release());
// we need to set timeout options here. things like tcp opts and
// qos are sock opts and don't change on the FD.
socket_->setSendTimeout(connectionOptions_.writeTimeout.count());
}
}
}
// Now authorize the connection
if (isAsyncSSLSocketMech(mech) && authorizationCallbacks_.onAuthorize &&
!authorizationCallbacks_.onAuthorize(*socket_, connectionOptions_)) {
if (connectionOptions_.securityOpts.sslAuthorizationEnforce) {
// Enforcement is enabled, fail all requests and close connection.
isAborting_ = true;
processShutdown("Authorization failure");
isAborting_ = false;
return;
}
}
assert(queue_.getInflightRequestCount() == 0);
assert(queue_.getParserInitializer() == nullptr);
if (connectionOptions_.enableQoS) {
// guard this since getPeerAddress could throw
try {
folly::SocketAddress address;
socket_->getPeerAddress(&address);
auto asyncSock = socket_->getUnderlyingTransport<folly::AsyncSocket>();
if (asyncSock) {
checkWhetherQoSIsApplied(
asyncSock->getNetworkSocket().toFd(),
address,
connectionOptions_,
"AsyncMcClient");
}
} catch (const std::exception& ex) {
LOG_FAILURE(
"AsyncMcClient",
failure::Category::kSystemError,
"Failed to enableQOS: {}",
folly::exceptionStr(ex));
}
}
if (!connectionOptions_.debugFifoPath.empty()) {
if (auto fifoManager = FifoManager::getInstance()) {
if (auto fifo =
fifoManager->fetchThreadLocal(connectionOptions_.debugFifoPath)) {
debugFifo_ = ConnectionFifo(
std::move(fifo), socket_.get(), connectionOptions_.routerInfoName);
}
}
}
if (connectionCallbacks_.onUp) {
connectionCallbacks_.onUp(*socket_, getNumConnectRetries());
}
numConnectTimeoutRetriesLeft_ = connectionOptions_.numConnectTimeoutRetries;
scheduleNextWriterLoop();
parser_ = std::make_unique<ParserT>(
*this,
kReadBufferSizeMin,
kReadBufferSizeMax,
connectionOptions_.useJemallocNodumpAllocator,
connectionOptions_.compressionCodecMap,
&debugFifo_);
socket_->setReadCB(this);
}