in thrift/lib/cpp2/transport/rocket/server/ThriftRocketServerHandler.cpp [119:252]
void ThriftRocketServerHandler::handleSetupFrame(
SetupFrame&& frame, RocketServerConnection& connection) {
if (!frame.payload().hasNonemptyMetadata()) {
return connection.close(folly::make_exception_wrapper<RocketException>(
ErrorCode::INVALID_SETUP, "Missing required metadata in SETUP frame"));
}
folly::io::Cursor cursor(frame.payload().buffer());
// Validate Thrift protocol key
uint32_t protocolKey;
const bool success = cursor.tryReadBE<uint32_t>(protocolKey);
constexpr uint32_t kLegacyRocketProtocolKey = 1;
if (!success ||
((!THRIFT_FLAG(rocket_server_legacy_protocol_key) ||
protocolKey != kLegacyRocketProtocolKey) &&
protocolKey != RpcMetadata_constants::kRocketProtocolKey())) {
if (!frame.rocketMimeTypes()) {
return connection.close(folly::make_exception_wrapper<RocketException>(
ErrorCode::INVALID_SETUP, "Incompatible Thrift version"));
}
// If Rocket MIME types are used the protocol key is optional.
if (success) {
cursor.retreat(4);
}
}
try {
CompactProtocolReader reader;
reader.setInput(cursor);
RequestSetupMetadata meta;
// Throws on read error
meta.read(&reader);
if (reader.getCursorPosition() > frame.payload().metadataSize()) {
return connection.close(folly::make_exception_wrapper<RocketException>(
ErrorCode::INVALID_SETUP,
"Error deserializing SETUP payload: underflow"));
}
connContext_.readSetupMetadata(meta);
auto minVersion = meta.minVersion_ref().value_or(0);
auto maxVersion = meta.maxVersion_ref().value_or(0);
THRIFT_CONNECTION_EVENT(rocket.setup).log(connContext_, [&] {
return folly::dynamic::object("client_min_version", minVersion)(
"client_max_version", maxVersion)("server_version", version_);
});
if (minVersion > version_) {
return connection.close(folly::make_exception_wrapper<RocketException>(
ErrorCode::INVALID_SETUP, "Incompatible Rocket version"));
}
if (maxVersion < kRocketServerMinVersion) {
return connection.close(folly::make_exception_wrapper<RocketException>(
ErrorCode::INVALID_SETUP, "Incompatible Rocket version"));
}
version_ = std::min(version_, maxVersion);
if (version_ >= 9 && !frame.rocketMimeTypes()) {
return connection.close(folly::make_exception_wrapper<RocketException>(
ErrorCode::INVALID_SETUP, "Unsupported MIME types"));
}
eventBase_ = connContext_.getTransport()->getEventBase();
for (const auto& h : setupFrameHandlers_) {
auto processorInfo = h->tryHandle(meta);
if (processorInfo) {
bool valid = true;
processorFactory_ = std::addressof(processorInfo->processorFactory_);
serviceMetadata_ =
std::addressof(worker_->getMetadataForService(*processorFactory_));
serviceRequestInfoMap_ = processorFactory_->getServiceRequestInfoMap();
valid &= !!(processor_ = processorFactory_->getProcessor());
// Allow no thread manager if resource pools in use
valid &=
(!!(threadManager_ = std::move(processorInfo->threadManager_)) ||
useResourcePoolsFlagsSet());
valid &= !!(serverConfigs_ = &processorInfo->serverConfigs_);
requestsRegistry_ = processorInfo->requestsRegistry_ != nullptr
? processorInfo->requestsRegistry_
: worker_->getRequestsRegistry();
if (!valid) {
return connection.close(
folly::make_exception_wrapper<RocketException>(
ErrorCode::INVALID_SETUP,
"Error in implementation of custom connection handler."));
}
return;
}
}
// no custom frame handler was found, do the default
processorFactory_ =
std::addressof(worker_->getServer()->getDecoratedProcessorFactory());
serviceMetadata_ =
std::addressof(worker_->getMetadataForService(*processorFactory_));
serviceRequestInfoMap_ = processorFactory_->getServiceRequestInfoMap();
processor_ = processorFactory_->getProcessor();
if (!useResourcePoolsFlagsSet()) {
threadManager_ = worker_->getServer()->getThreadManager();
}
serverConfigs_ = worker_->getServer();
requestsRegistry_ = worker_->getRequestsRegistry();
// add sampleRate
if (serverConfigs_) {
if (auto* observer = serverConfigs_->getObserver()) {
sampleRate_ = observer->getSampleRate();
}
}
if (meta.dscpToReflect_ref() || meta.markToReflect_ref()) {
connection.applyDscpAndMarkToSocket(meta);
}
DCHECK_GE(version_, 5);
ServerPushMetadata serverMeta;
serverMeta.set_setupResponse();
serverMeta.setupResponse_ref()->version_ref() = version_;
serverMeta.setupResponse_ref()->zstdSupported_ref() = true;
CompactProtocolWriter compactProtocolWriter;
folly::IOBufQueue queue;
compactProtocolWriter.setOutput(&queue);
serverMeta.write(&compactProtocolWriter);
connection.sendMetadataPush(std::move(queue).move());
} catch (const std::exception& e) {
return connection.close(folly::make_exception_wrapper<RocketException>(
ErrorCode::INVALID_SETUP,
fmt::format(
"Error deserializing SETUP payload: {}",
folly::exceptionStr(e).toStdString())));
}
}