in thrift/lib/cpp2/transport/rocket/server/ThriftRocketServerHandler.cpp [368:721]
void ThriftRocketServerHandler::handleRequestCommon(
Payload&& payload, F&& makeRequest, RpcKind expectedKind) {
// setup request sampling for counters and stats
auto samplingStatus = shouldSample();
std::chrono::steady_clock::time_point readEnd;
if (UNLIKELY(samplingStatus.isEnabled())) {
readEnd = std::chrono::steady_clock::now();
}
rocket::Payload debugPayload = payload.clone();
auto requestPayloadTry =
unpackAsCompressed<RequestPayload>(std::move(payload));
auto makeActiveRequest = [&](auto&& md, auto&& payload, auto&& reqCtx) {
serverConfigs_->incActiveRequests();
return makeRequest(std::move(md), std::move(payload), std::move(reqCtx));
};
auto createDefaultRequestContext = [&] {
return std::make_shared<folly::RequestContext>(
requestsRegistry_->genRootId());
};
if (requestPayloadTry.hasException()) {
handleDecompressionFailure(
makeActiveRequest(
RequestRpcMetadata(),
rocket::Payload{},
createDefaultRequestContext()),
requestPayloadTry.exception().what().toStdString());
return;
}
auto& data = requestPayloadTry->payload;
auto& metadata = requestPayloadTry->metadata;
if (!isMetadataValid(metadata, expectedKind)) {
handleRequestWithBadMetadata(makeActiveRequest(
std::move(metadata),
std::move(debugPayload),
createDefaultRequestContext()));
return;
}
if (worker_->isStopping()) {
handleServerShutdown(makeActiveRequest(
std::move(metadata),
std::move(debugPayload),
createDefaultRequestContext()));
return;
}
THRIFT_APPLICATION_EVENT(server_read_headers).log([&] {
auto size =
metadata.otherMetadata_ref() ? metadata.otherMetadata_ref()->size() : 0;
std::vector<folly::dynamic> keys;
if (size) {
keys.reserve(size);
for (auto& [k, v] : *metadata.otherMetadata_ref()) {
keys.push_back(k);
}
}
int fmd_sz = 0;
if (auto fmd = metadata.frameworkMetadata_ref()) {
DCHECK(*fmd) << "initialized IOBuf is null";
fmd_sz = (**fmd).computeChainDataLength();
}
return folly::dynamic::object("size", size) //
("keys", folly::dynamic::array(std::move(keys))) //
("frameworkMetadataSize", fmd_sz);
});
if (metadata.crc32c_ref()) {
try {
if (auto compression = metadata.compression_ref()) {
data = uncompressBuffer(std::move(data), *compression);
}
} catch (...) {
handleDecompressionFailure(
makeActiveRequest(
std::move(metadata),
rocket::Payload{},
createDefaultRequestContext()),
folly::exceptionStr(std::current_exception()).toStdString());
return;
}
}
// check the checksum
const bool badChecksum = metadata.crc32c_ref() &&
(*metadata.crc32c_ref() != checksum::crc32c(*data));
if (badChecksum) {
handleRequestWithBadChecksum(makeActiveRequest(
std::move(metadata),
std::move(debugPayload),
createDefaultRequestContext()));
return;
}
if (auto injectedFailure = worker_->getServer()->maybeInjectFailure();
injectedFailure != ThriftServer::InjectedFailure::NONE) {
InjectedFault injectedFault;
switch (injectedFailure) {
case ThriftServer::InjectedFailure::NONE:
folly::assume_unreachable();
case ThriftServer::InjectedFailure::ERROR:
injectedFault = InjectedFault::ERROR;
break;
case ThriftServer::InjectedFailure::DROP:
injectedFault = InjectedFault::DROP;
break;
case ThriftServer::InjectedFailure::DISCONNECT:
injectedFault = InjectedFault::DISCONNECT;
break;
}
handleInjectedFault(
makeActiveRequest(
std::move(metadata),
std::move(debugPayload),
createDefaultRequestContext()),
injectedFault);
return;
}
using PerServiceMetadata = Cpp2Worker::PerServiceMetadata;
const PerServiceMetadata::FindMethodResult methodMetadataResult =
serviceMetadata_->findMethod(
metadata.name_ref()
? metadata.name_ref()->view()
: std::string_view{}); // need to call with empty string_view
// because we still distinguish
// between NotImplemented and
// MetadataNotFound
auto rootid = requestsRegistry_->genRootId();
auto baseReqCtx =
serviceMetadata_->getBaseContextForRequest(methodMetadataResult);
auto reqCtx = baseReqCtx
? folly::RequestContext::copyAsRoot(*baseReqCtx, rootid)
: std::make_shared<folly::RequestContext>(rootid);
folly::RequestContextScopeGuard rctx(reqCtx);
// A request should not be active until the overload checking is done.
auto request = makeRequest(
std::move(metadata), std::move(debugPayload), std::move(reqCtx));
// check if server is overloaded
const auto& headers = request->getTHeader().getHeaders();
const auto& name = request->getMethodName();
bool useResourcePools =
useResourcePoolsFlagsSet() && !serverConfigs_->resourcePoolSet().empty();
if (useResourcePools) {
serverConfigs_->incActiveRequests();
if (!serverConfigs_->shouldHandleRequestForMethod(name)) {
handleServerNotReady(std::move(request));
return;
}
} else {
auto errorCode = serverConfigs_->checkOverload(&headers, &name);
serverConfigs_->incActiveRequests();
if (UNLIKELY(errorCode.has_value())) {
handleRequestOverloadedServer(std::move(request), errorCode.value());
return;
}
if (!serverConfigs_->shouldHandleRequestForMethod(name)) {
handleServerNotReady(std::move(request));
return;
}
auto preprocessResult = serverConfigs_->preprocess(
{headers, name, connContext_, request.get()});
if (UNLIKELY(!std::holds_alternative<std::monostate>(preprocessResult))) {
folly::variant_match(
preprocessResult,
[&](AppClientException& ace) {
handleAppError(
std::move(request), ace.name(), ace.getMessage(), true);
},
[&](AppOverloadedException&) {
handleRequestOverloadedServer(
std::move(request), kAppOverloadedErrorCode);
},
[&](const AppServerException& ase) {
handleAppError(
std::move(request), ase.name(), ase.getMessage(), false);
},
[](std::monostate&) { folly::assume_unreachable(); });
return;
}
}
logSetupConnectionEventsOnce(setupLoggingFlag_, connContext_);
auto* cpp2ReqCtx = request->getRequestContext();
auto& timestamps = cpp2ReqCtx->getTimestamps();
timestamps.setStatus(samplingStatus);
if (UNLIKELY(samplingStatus.isEnabled())) {
timestamps.readEnd = readEnd;
timestamps.processBegin = std::chrono::steady_clock::now();
}
// Log monitoring methods that are called over non-monitoring interface so
// that they can be migrated.
LoggingSampler monitoringLogSampler{
THRIFT_FLAG(monitoring_over_user_logging_sample_rate)};
if (UNLIKELY(monitoringLogSampler.isSampled())) {
if (LIKELY(connContext_.getInterfaceKind() != InterfaceKind::MONITORING)) {
auto& methodName = request->getMethodName();
if (UNLIKELY(isMonitoringMethodName(methodName))) {
THRIFT_CONNECTION_EVENT(monitoring_over_user)
.logSampled(connContext_, monitoringLogSampler, [&] {
return folly::dynamic::object("method_name", methodName);
});
}
}
}
if (serverConfigs_) {
if (auto* observer = serverConfigs_->getObserver()) {
observer->admittedRequest(&request->getMethodName());
// Expensive operations; happens only when sampling is enabled
if (samplingStatus.isEnabledByServer()) {
if (threadManager_) {
observer->queuedRequests(threadManager_->pendingUpstreamTaskCount());
}
observer->activeRequests(serverConfigs_->getActiveRequests());
}
}
}
const auto protocolId = request->getProtoId();
if (auto interactionId = metadata.interactionId_ref()) {
cpp2ReqCtx->setInteractionId(*interactionId);
}
if (auto interactionCreate = metadata.interactionCreate_ref()) {
cpp2ReqCtx->setInteractionCreate(*interactionCreate);
DCHECK_EQ(cpp2ReqCtx->getInteractionId(), 0);
cpp2ReqCtx->setInteractionId(*interactionCreate->interactionId_ref());
}
auto serializedCompressedRequest = SerializedCompressedRequest(
std::move(data),
metadata.crc32c_ref()
? CompressionAlgorithm::NONE
: metadata.compression_ref().value_or(CompressionAlgorithm::NONE));
try {
if (auto* found = std::get_if<PerServiceMetadata::MetadataFound>(
&methodMetadataResult);
LIKELY(found != nullptr)) {
if (useResourcePools) {
const ServiceRequestInfo* serviceRequestInfo = serviceRequestInfoMap_
? folly::get_ptr(
serviceRequestInfoMap_->get(), request->getMethodName())
: nullptr;
if (!serviceRequestInfo) {
std::string_view methodName = request->getMethodName();
AsyncProcessorHelper::sendUnknownMethodError(
std::move(request), methodName);
return;
}
ServerRequest serverRequest(
std::move(request),
std::move(serializedCompressedRequest),
eventBase_,
cpp2ReqCtx,
protocolId,
folly::RequestContext::saveContext(),
processor_.get(),
&found->metadata,
serviceRequestInfo);
// Once we remove the old code we'll move validateRpcKind to a helper.
if (!GeneratedAsyncProcessor::validateRpcKind(
serverRequest.request(),
serverRequest.requestInfo()->rpcKind)) {
return;
}
auto poolResult = AsyncProcessorHelper::selectResourcePool(
serverRequest, found->metadata);
if (auto* reject = std::get_if<ServerRequestRejection>(&poolResult)) {
auto errorCode = kAppOverloadedErrorCode;
if (reject->applicationException().getType() ==
TApplicationException::UNKNOWN_METHOD) {
errorCode = kMethodUnknownErrorCode;
}
serverRequest.request()->sendErrorWrapped(
folly::exception_wrapper(
folly::in_place, std::move(*reject).applicationException()),
errorCode);
return;
}
auto resourcePoolHandle =
std::get_if<std::reference_wrapper<const ResourcePoolHandle>>(
&poolResult);
DCHECK(serverConfigs_->resourcePoolSet().hasResourcePool(
*resourcePoolHandle));
auto& resourcePool =
serverConfigs_->resourcePoolSet().resourcePool(*resourcePoolHandle);
apache::thrift::detail::ServerRequestHelper::setExecutor(
serverRequest, resourcePool.executor().value_or(nullptr));
auto result = resourcePool.accept(std::move(serverRequest));
if (result) {
auto errorCode = kQueueOverloadedErrorCode;
serverRequest.request()->sendErrorWrapped(
folly::exception_wrapper(
folly::in_place,
std::move(std::move(result).value()).applicationException()),
errorCode);
return;
}
} else {
processor_->processSerializedCompressedRequestWithMetadata(
std::move(request),
std::move(serializedCompressedRequest),
found->metadata,
protocolId,
cpp2ReqCtx,
eventBase_,
threadManager_.get());
}
} else if (std::holds_alternative<
PerServiceMetadata::MetadataNotImplemented>(
methodMetadataResult)) {
// The AsyncProcessorFactory does not implement createMethodMetadata
// so we need to fallback to processSerializedCompressedRequest.
processor_->processSerializedCompressedRequest(
std::move(request),
std::move(serializedCompressedRequest),
protocolId,
cpp2ReqCtx,
eventBase_,
threadManager_.get());
} else if (std::holds_alternative<PerServiceMetadata::MetadataNotFound>(
methodMetadataResult)) {
std::string_view methodName = request->getMethodName();
AsyncProcessorHelper::sendUnknownMethodError(
std::move(request), methodName);
} else {
LOG(FATAL) << "Invalid PerServiceMetadata from Cpp2Worker";
}
} catch (...) {
LOG(DFATAL) << "AsyncProcessor::process exception: "
<< folly::exceptionStr(std::current_exception());
}
}