in thrift/lib/cpp2/server/Cpp2Connection.cpp [310:720]
void Cpp2Connection::requestReceived(
unique_ptr<HeaderServerChannel::HeaderRequest>&& hreq) {
auto& samplingStatus = hreq->getSamplingStatus();
std::chrono::steady_clock::time_point readEnd;
if (samplingStatus.isEnabled()) {
readEnd = std::chrono::steady_clock::now();
}
bool useHttpHandler = false;
// Any POST not for / should go to the status handler
if (hreq->getHeader()->getClientType() == THRIFT_HTTP_SERVER_TYPE) {
auto buf = hreq->getBuf();
// 7 == length of "POST / " - we are matching on the path
if (buf->length() >= 7 &&
0 == strncmp(reinterpret_cast<const char*>(buf->data()), "POST", 4) &&
buf->data()[6] != ' ') {
useHttpHandler = true;
}
// Any GET should use the handler
if (buf->length() >= 3 &&
0 == strncmp(reinterpret_cast<const char*>(buf->data()), "GET", 3)) {
useHttpHandler = true;
}
// Any HEAD should use the handler
if (buf->length() >= 4 &&
0 == strncmp(reinterpret_cast<const char*>(buf->data()), "HEAD", 4)) {
useHttpHandler = true;
}
}
if (useHttpHandler && worker_->getServer()->getGetHandler()) {
worker_->getServer()->getGetHandler()(
worker_->getEventBase(),
worker_->getConnectionManager(),
transport_,
hreq->extractBuf());
// Close the channel, since the handler now owns the socket.
channel_->setCallback(nullptr);
channel_->setTransport(nullptr);
stop();
return;
}
if (THRIFT_FLAG(server_header_reject_http) &&
(hreq->getHeader()->getClientType() == THRIFT_HTTP_SERVER_TYPE ||
hreq->getHeader()->getClientType() == THRIFT_HTTP_CLIENT_TYPE ||
hreq->getHeader()->getClientType() == THRIFT_HTTP_GET_CLIENT_TYPE)) {
disconnect("Rejecting HTTP connection over Header");
return;
}
if (THRIFT_FLAG(server_header_reject_framed) &&
(hreq->getHeader()->getClientType() == THRIFT_FRAMED_DEPRECATED ||
hreq->getHeader()->getClientType() == THRIFT_FRAMED_COMPACT)) {
disconnect("Rejecting framed connection over Header");
return;
}
if (THRIFT_FLAG(server_header_reject_unframed) &&
(hreq->getHeader()->getClientType() == THRIFT_UNFRAMED_DEPRECATED ||
hreq->getHeader()->getClientType() ==
THRIFT_UNFRAMED_COMPACT_DEPRECATED)) {
disconnect("Rejecting unframed connection over Header");
return;
}
auto protoId = static_cast<apache::thrift::protocol::PROTOCOL_TYPES>(
hreq->getHeader()->getProtocolId());
auto msgBegin = apache::thrift::detail::ap::deserializeMessageBegin(
*hreq->getBuf(), protoId);
std::string& methodName = msgBegin.methodName;
const auto& meta = msgBegin.metadata;
// Transport upgrade: check if client requested transport upgrade from header
// to rocket. If yes, reply immediately and upgrade the transport after
// sending the reply.
if (methodName == "upgradeToRocket") {
if (THRIFT_FLAG(server_rocket_upgrade_enabled)) {
ResponsePayload response;
switch (protoId) {
case apache::thrift::protocol::T_BINARY_PROTOCOL:
response = upgradeToRocketReply<apache::thrift::BinaryProtocolWriter>(
meta.seqId);
break;
case apache::thrift::protocol::T_COMPACT_PROTOCOL:
response =
upgradeToRocketReply<apache::thrift::CompactProtocolWriter>(
meta.seqId);
break;
default:
LOG(DFATAL) << "Unsupported protocol found";
// if protocol is neither binary or compact, we want to kill the
// request and abort upgrade
killRequest(
std::move(hreq),
TApplicationException::TApplicationExceptionType::
INVALID_PROTOCOL,
kUnknownErrorCode,
"invalid protocol used");
return;
}
hreq->sendReply(
std::move(response),
new TransportUpgradeSendCallback(
transport_,
context_.getPeerAddress(),
getWorker(),
this,
channel_.get()));
return;
} else {
killRequest(
std::move(hreq),
TApplicationException::TApplicationExceptionType::UNKNOWN_METHOD,
kMethodUnknownErrorCode,
"Rocket upgrade disabled");
return;
}
}
if (worker_->getServer()->isHeaderDisabled()) {
disconnect("Rejecting Header connection");
return;
}
using PerServiceMetadata = Cpp2Worker::PerServiceMetadata;
const PerServiceMetadata::FindMethodResult methodMetadataResult =
serviceMetadata_.findMethod(methodName);
auto baseReqCtx =
serviceMetadata_.getBaseContextForRequest(methodMetadataResult);
auto rootid = worker_->getRequestsRegistry()->genRootId();
auto reqCtx = baseReqCtx
? folly::RequestContext::copyAsRoot(*baseReqCtx, rootid)
: std::make_shared<folly::RequestContext>(rootid);
folly::RequestContextScopeGuard rctx(reqCtx);
auto server = worker_->getServer();
server->touchRequestTimestamp();
auto* observer = server->getObserver();
if (observer) {
observer->receivedRequest(&methodName);
}
auto injectedFailure = server->maybeInjectFailure();
switch (injectedFailure) {
case ThriftServer::InjectedFailure::NONE:
break;
case ThriftServer::InjectedFailure::ERROR:
killRequest(
std::move(hreq),
TApplicationException::TApplicationExceptionType::INJECTED_FAILURE,
kInjectedFailureErrorCode,
"injected failure");
return;
case ThriftServer::InjectedFailure::DROP:
VLOG(1) << "ERROR: injected drop: "
<< context_.getPeerAddress()->getAddressStr();
return;
case ThriftServer::InjectedFailure::DISCONNECT:
disconnect("injected failure");
return;
}
if (server->getGetHeaderHandler()) {
server->getGetHeaderHandler()(hreq->getHeader(), context_.getPeerAddress());
}
if (auto overloadResult = server->checkOverload(
&hreq->getHeader()->getHeaders(), &methodName)) {
killRequest(
std::move(hreq),
TApplicationException::LOADSHEDDING,
overloadResult.value(),
"loadshedding request");
return;
}
if (auto preprocessResult = server->preprocess(
{hreq->getHeader()->getHeaders(), methodName, context_});
!std::holds_alternative<std::monostate>(preprocessResult)) {
folly::variant_match(
preprocessResult,
[&](AppClientException& ace) {
handleAppError(std::move(hreq), ace.name(), ace.getMessage(), true);
},
[&](AppOverloadedException& aoe) {
killRequest(
std::move(hreq),
TApplicationException::LOADSHEDDING,
kAppOverloadedErrorCode,
aoe.getMessage().c_str());
},
[&](AppServerException& ase) {
handleAppError(std::move(hreq), ase.name(), ase.getMessage(), false);
},
[](std::monostate&) { folly::assume_unreachable(); });
return;
}
if (worker_->isStopping()) {
killRequest(
std::move(hreq),
TApplicationException::TApplicationExceptionType::INTERNAL_ERROR,
kQueueOverloadedErrorCode,
"server shutting down");
return;
}
if (!server->shouldHandleRequestForMethod(methodName)) {
killRequest(
std::move(hreq),
TApplicationException::TApplicationExceptionType::INTERNAL_ERROR,
kQueueOverloadedErrorCode,
"server not ready");
return;
}
// After this, the request buffer is no longer owned by the request
// and will be released after deserializeRequest.
auto serializedRequest = [&] {
folly::IOBufQueue bufQueue;
bufQueue.append(hreq->extractBuf());
bufQueue.trimStart(meta.size);
return SerializedRequest(bufQueue.move());
}();
// We keep a clone of the request payload buffer for debugging purposes, but
// the lifetime of payload should not necessarily be the same as its request
// object's.
auto debugPayload =
rocket::Payload::makeCombined(serializedRequest.buffer->clone(), 0);
std::chrono::milliseconds queueTimeout;
std::chrono::milliseconds taskTimeout;
std::chrono::milliseconds clientQueueTimeout =
hreq->getHeader()->getClientQueueTimeout();
std::chrono::milliseconds clientTimeout =
hreq->getHeader()->getClientTimeout();
auto differentTimeouts = server->getTaskExpireTimeForRequest(
clientQueueTimeout, clientTimeout, queueTimeout, taskTimeout);
folly::call_once(clientInfoFlag_, [&] {
if (const auto& m = hreq->getHeader()->extractClientMetadata()) {
context_.setClientMetadata(*m);
}
});
context_.setClientType(hreq->getHeader()->getClientType());
auto t2r = RequestsRegistry::makeRequest<Cpp2Request>(
std::move(hreq),
std::move(reqCtx),
this_,
std::move(debugPayload),
std::move(methodName));
server->incActiveRequests();
if (samplingStatus.isEnabled()) {
// Expensive operations; happens only when sampling is enabled
auto& timestamps = t2r->getTimestamps();
timestamps.setStatus(samplingStatus);
timestamps.readEnd = readEnd;
timestamps.processBegin = std::chrono::steady_clock::now();
if (samplingStatus.isEnabledByServer() && observer) {
if (threadManager_) {
observer->queuedRequests(threadManager_->pendingUpstreamTaskCount());
}
observer->activeRequests(server->getActiveRequests());
}
}
activeRequests_.insert(t2r.get());
auto reqContext = t2r->getContext();
if (observer) {
observer->admittedRequest(&reqContext->getMethodName());
}
if (differentTimeouts) {
if (queueTimeout > std::chrono::milliseconds(0)) {
scheduleTimeout(&t2r->queueTimeout_, queueTimeout);
}
}
if (taskTimeout > std::chrono::milliseconds(0)) {
scheduleTimeout(&t2r->taskTimeout_, taskTimeout);
}
if (clientTimeout > std::chrono::milliseconds::zero()) {
reqContext->setRequestTimeout(clientTimeout);
} else {
reqContext->setRequestTimeout(taskTimeout);
}
// Log monitoring methods that are called over header interface so that they
// can be migrated to rocket monitoring interface.
LoggingSampler monitoringLogSampler{
THRIFT_FLAG(monitoring_over_header_logging_sample_rate)};
if (monitoringLogSampler.isSampled()) {
if (isMonitoringMethodName(reqContext->getMethodName())) {
THRIFT_CONNECTION_EVENT(monitoring_over_header)
.logSampled(context_, monitoringLogSampler, [&] {
return folly::dynamic::object(
"method_name", reqContext->getMethodName());
});
}
}
try {
ResponseChannelRequest::UniquePtr req = std::move(t2r);
if (!apache::thrift::detail::ap::setupRequestContextWithMessageBegin(
meta, protoId, req, reqContext, worker_->getEventBase())) {
return;
}
folly::variant_match(
methodMetadataResult,
[&](PerServiceMetadata::MetadataNotImplemented) {
logSetupConnectionEventsOnce(setupLoggingFlag_, context_);
// The AsyncProcessorFactory does not implement createMethodMetadata
// so we need to fallback to processSerializedCompressedRequest.
processor_->processSerializedCompressedRequest(
std::move(req),
SerializedCompressedRequest(std::move(serializedRequest)),
protoId,
reqContext,
worker_->getEventBase(),
threadManager_.get());
},
[&](PerServiceMetadata::MetadataNotFound) {
AsyncProcessorHelper::sendUnknownMethodError(
std::move(req), reqContext->getMethodName());
},
[&](const PerServiceMetadata::MetadataFound& found) {
logSetupConnectionEventsOnce(setupLoggingFlag_, context_);
if (!server->resourcePoolSet().empty()) {
// We need to process this using request pools
const ServiceRequestInfo* serviceRequestInfo{nullptr};
if (serviceRequestInfoMap_) {
serviceRequestInfo = &serviceRequestInfoMap_->get().at(
reqContext->getMethodName());
}
ServerRequest serverRequest(
std::move(req),
SerializedCompressedRequest(std::move(serializedRequest)),
worker_->getEventBase(),
reqContext,
protoId,
folly::RequestContext::saveContext(),
processor_.get(),
&found.metadata,
serviceRequestInfo);
auto poolResult = AsyncProcessorHelper::selectResourcePool(
serverRequest, found.metadata);
if (auto* reject =
std::get_if<ServerRequestRejection>(&poolResult)) {
auto errorCode = kAppOverloadedErrorCode;
if (reject->applicationException().getType() ==
TApplicationException::UNKNOWN_METHOD) {
errorCode = kMethodUnknownErrorCode;
}
serverRequest.request()->sendErrorWrapped(
folly::exception_wrapper(
folly::in_place,
std::move(*reject).applicationException()),
errorCode);
return;
}
auto resourcePoolHandle =
std::get_if<std::reference_wrapper<const ResourcePoolHandle>>(
&poolResult);
DCHECK(
server->resourcePoolSet().hasResourcePool(*resourcePoolHandle));
auto& resourcePool =
server->resourcePoolSet().resourcePool(*resourcePoolHandle);
apache::thrift::detail::ServerRequestHelper::setExecutor(
serverRequest, resourcePool.executor().value_or(nullptr));
auto result = resourcePool.accept(std::move(serverRequest));
if (result) {
auto errorCode = kQueueOverloadedErrorCode;
serverRequest.request()->sendErrorWrapped(
folly::exception_wrapper(
folly::in_place,
std::move(std::move(result).value())
.applicationException()),
errorCode);
return;
}
} else {
processor_->processSerializedCompressedRequestWithMetadata(
std::move(req),
SerializedCompressedRequest(std::move(serializedRequest)),
found.metadata,
protoId,
reqContext,
worker_->getEventBase(),
threadManager_.get());
}
});
} catch (...) {
LOG(DFATAL) << "AsyncProcessor::process exception: "
<< folly::exceptionStr(std::current_exception());
}
}