in quic/server/QuicServerWorker.cpp [552:858]
void QuicServerWorker::dispatchPacketData(
const folly::SocketAddress& client,
RoutingData&& routingData,
NetworkData&& networkData,
folly::Optional<QuicVersion> quicVersion,
bool isForwardedData) noexcept {
DCHECK(socket_);
QuicServerTransport::Ptr transport;
bool dropPacket = false;
auto cit = connectionIdMap_.find(routingData.destinationConnId);
if (cit != connectionIdMap_.end()) {
transport = cit->second;
VLOG(10) << "Found existing connection for CID="
<< routingData.destinationConnId.hex() << " " << *transport;
} else if (routingData.headerForm != HeaderForm::Long) {
// Drop the packet if the header form is not long
VLOG(3) << fmt::format(
"Dropping non-long header packet with no connid match"
" headerForm={}, routingInfo={}",
static_cast<typename std::underlying_type<HeaderForm>::type>(
routingData.headerForm),
logRoutingInfo(routingData.destinationConnId));
// Try forwarding the packet to the old server (if it is enabled)
dropPacket = true;
}
bool cannotMakeTransport = false;
if (!dropPacket && !transport) {
// For LongHeader packets without existing associated connection, try to
// route with destinationConnId chosen by the peer and IP address of the
// peer.
CHECK(transportFactory_);
auto source = std::make_pair(client, routingData.destinationConnId);
auto sit = sourceAddressMap_.find(source);
if (sit == sourceAddressMap_.end()) {
// If it's a 0RTT packet and we have no CID, we probably lost the initial
// and want to buffer it for a while.
if (routingData.is0Rtt) {
auto itr = pending0RttData_.find(routingData.destinationConnId);
if (itr == pending0RttData_.end()) {
itr =
pending0RttData_.insert(routingData.destinationConnId, {}).first;
}
auto& vec = itr->second;
if (vec.size() != vec.max_size()) {
vec.emplace_back(std::move(networkData));
QUIC_STATS(statsCallback_, onZeroRttBuffered);
}
return;
} else if (!routingData.isInitial) {
VLOG(3) << fmt::format(
"Dropping packet from client={}, routingInfo={}",
client.describe(),
logRoutingInfo(routingData.destinationConnId));
dropPacket = true;
} else {
VLOG(4) << fmt::format(
"Creating new connection for client={}, routingInfo={}",
client.describe(),
logRoutingInfo(routingData.destinationConnId));
// This could be a new connection, add it in the map
// verify that the initial packet is at least min initial bytes
// to avoid amplification attacks.
if (networkData.totalData < kMinInitialPacketSize) {
// Don't even attempt to forward the packet, just drop it.
VLOG(3) << "Dropping small initial packet from client=" << client;
QUIC_STATS(
statsCallback_,
onPacketDropped,
PacketDropReason::INVALID_PACKET);
return;
}
// If there is a token present, decrypt it (could be either a retry
// token or a new token)
folly::io::Cursor cursor(networkData.packets.front().get());
auto maybeEncryptedToken = maybeGetEncryptedToken(cursor);
bool hasTokenSecret = transportSettings_.retryTokenSecret.hasValue();
// If the retryTokenSecret is not set, just skip evaluating validity of
// token and assume true
auto isValidRetryToken = !hasTokenSecret ||
(maybeEncryptedToken &&
validRetryToken(
*maybeEncryptedToken,
routingData.destinationConnId,
client.getIPAddress()));
auto isValidNewToken = !hasTokenSecret ||
(maybeEncryptedToken &&
validNewToken(*maybeEncryptedToken, client.getIPAddress()));
if (isValidNewToken) {
QUIC_STATS(statsCallback_, onNewTokenReceived);
} else if (maybeEncryptedToken && !isValidRetryToken) {
// Failed to decrypt the token as either a new or retry token
QUIC_STATS(statsCallback_, onTokenDecryptFailure);
}
// If rate-limiting is configured and there is no retry token,
// send a retry packet back to the client
if (!isValidRetryToken &&
((newConnRateLimiter_ &&
newConnRateLimiter_->check(networkData.receiveTimePoint)) ||
(unfinishedHandshakeLimitFn_.has_value() &&
globalUnfinishedHandshakes >=
(*unfinishedHandshakeLimitFn_)()))) {
if (hasTokenSecret) {
sendRetryPacket(
client,
routingData.destinationConnId,
routingData.sourceConnId.value_or(
ConnectionId(std::vector<uint8_t>())));
QUIC_STATS(statsCallback_, onConnectionRateLimited);
return;
} else {
VLOG(4)
<< "Not sending retry packet since retry token secret is not set";
}
}
// Check that we have a proper quic version before creating transport.
CHECK(quicVersion.has_value())
<< "no QUIC version supplied for transport creation";
// create 'accepting' transport
auto sock = makeSocket(getEventBase());
auto trans = transportFactory_->make(
getEventBase(), std::move(sock), client, quicVersion.value(), ctx_);
if (!trans) {
dropPacket = true;
cannotMakeTransport = true;
} else {
globalUnfinishedHandshakes++;
CHECK(trans);
if (transportSettings_.dataPathType ==
DataPathType::ContinuousMemory &&
bufAccessor_) {
trans->setBufAccessor(bufAccessor_.get());
}
trans->setPacingTimer(pacingTimer_);
trans->setRoutingCallback(this);
trans->setHandshakeFinishedCallback(this);
trans->setSupportedVersions(supportedVersions_);
trans->setOriginalPeerAddress(client);
#ifdef CCP_ENABLED
trans->setCcpDatapath(getCcpReader()->getDatapath());
#endif
trans->setCongestionControllerFactory(ccFactory_);
if (statsCallback_) {
trans->setTransportStatsCallback(statsCallback_.get());
}
auto overridenTransportSettings = transportSettingsOverrideFn_
? transportSettingsOverrideFn_(
transportSettings_, client.getIPAddress())
: folly::none;
if (overridenTransportSettings) {
if (overridenTransportSettings->dataPathType !=
transportSettings_.dataPathType) {
// It's too complex to support that.
LOG(ERROR)
<< "Overriding DataPathType isn't supported. Requested datapath="
<< (overridenTransportSettings->dataPathType ==
DataPathType::ContinuousMemory
? "ContinuousMemory"
: "ChainedMemory");
}
trans->setTransportSettings(*overridenTransportSettings);
} else {
trans->setTransportSettings(transportSettings_);
}
trans->setConnectionIdAlgo(connIdAlgo_.get());
trans->setServerConnectionIdRejector(this);
if (routingData.sourceConnId) {
trans->setClientConnectionId(*routingData.sourceConnId);
}
trans->setClientChosenDestConnectionId(routingData.destinationConnId);
// parameters to create server chosen connection id
ServerConnectionIdParams serverConnIdParams(
cidVersion_,
hostId_,
static_cast<uint8_t>(processId_),
workerId_);
trans->setServerConnectionIdParams(std::move(serverConnIdParams));
trans->accept();
auto result = sourceAddressMap_.emplace(std::make_pair(
std::make_pair(client, routingData.destinationConnId), trans));
if (!result.second) {
LOG(ERROR) << fmt::format(
"Routing entry already exists for client={}, routingInfo={}",
client.describe(),
logRoutingInfo(routingData.destinationConnId));
dropPacket = true;
} else {
for (const auto& observer : observerList_.getAll()) {
observer->accept(trans.get());
}
}
transport = trans;
}
}
} else {
transport = sit->second;
VLOG(4) << "Found existing connection for client=" << client << " "
<< *transport;
}
}
if (!dropPacket) {
DCHECK(transport->getEventBase()->isInEventBaseThread());
transport->onNetworkData(client, std::move(networkData));
// If we had pending 0RTT data for this DCID, process it.
if (routingData.isInitial && !pending0RttData_.empty()) {
auto itr = pending0RttData_.find(routingData.destinationConnId);
if (itr != pending0RttData_.end()) {
for (auto& data : itr->second) {
transport->onNetworkData(client, std::move(data));
}
pending0RttData_.erase(itr);
}
}
return;
}
if (cannotMakeTransport) {
VLOG(3)
<< "Dropping packet due to transport factory did not make transport";
QUIC_STATS(
statsCallback_,
onPacketDropped,
PacketDropReason::CANNOT_MAKE_TRANSPORT);
return;
}
if (!connIdAlgo_->canParse(routingData.destinationConnId)) {
VLOG(3) << "Dropping packet with bad DCID, routingInfo="
<< logRoutingInfo(routingData.destinationConnId);
QUIC_STATS(statsCallback_, onPacketDropped, PacketDropReason::PARSE_ERROR);
// TODO do we need to reset?
return;
}
auto connIdParam =
connIdAlgo_->parseConnectionId(routingData.destinationConnId);
if (connIdParam.hasError()) {
VLOG(3) << fmt::format(
"Dropping packet due to DCID parsing error={}, , errorCode={}, routingInfo={}",
connIdParam.error().what(),
folly::to<std::string>(connIdParam.error().errorCode()),
logRoutingInfo(routingData.destinationConnId));
QUIC_STATS(statsCallback_, onPacketDropped, PacketDropReason::PARSE_ERROR);
// TODO do we need to reset?
return;
}
if (connIdParam->hostId != hostId_) {
VLOG_EVERY_N(2, 100) << fmt::format(
"Dropping packet routed to wrong host, from client={}, routingInfo={},",
client.describe(),
logRoutingInfo(routingData.destinationConnId));
QUIC_STATS(
statsCallback_,
onPacketDropped,
PacketDropReason::ROUTING_ERROR_WRONG_HOST);
return sendResetPacket(
routingData.headerForm,
client,
networkData,
routingData.destinationConnId);
}
if (!packetForwardingEnabled_ || isForwardedData) {
QUIC_STATS(
statsCallback_,
onPacketDropped,
PacketDropReason::CONNECTION_NOT_FOUND);
return sendResetPacket(
routingData.headerForm,
client,
networkData,
routingData.destinationConnId);
}
// There's no existing connection for the packet's CID or the client's
// addr, and doesn't belong to the old server. Send a Reset.
if (connIdParam->processId == static_cast<uint8_t>(processId_)) {
QUIC_STATS(
statsCallback_,
onPacketDropped,
PacketDropReason::CONNECTION_NOT_FOUND);
return sendResetPacket(
routingData.headerForm,
client,
networkData,
routingData.destinationConnId);
}
// Optimistically route to another server
// if the packet type is not Initial and if there is not any connection
// associated with the given packet
VLOG(4) << fmt::format(
"Forwarding packet from client={} to another process, routingInfo={}",
client.describe(),
logRoutingInfo(routingData.destinationConnId));
auto recvTime = networkData.receiveTimePoint;
takeoverPktHandler_.forwardPacketToAnotherServer(
client, std::move(networkData).moveAllData(), recvTime);
QUIC_STATS(statsCallback_, onPacketForwarded);
}