mcrouter/lib/network/CaretProtocol.cpp (171 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ #include "mcrouter/lib/network/CaretProtocol.h" #include <folly/GroupVarint.h> #include <folly/Range.h> #include <folly/Varint.h> #include "mcrouter/lib/network/CaretHeader.h" #include "mcrouter/lib/network/ServerLoad.h" namespace facebook { namespace memcache { namespace { void resetAdditionalFields(CaretMessageInfo& info) { info.traceId = {0, 0}; info.supportedCodecsFirstId = 0; info.supportedCodecsSize = 0; info.usedCodecId = 0; info.uncompressedBodySize = 0; info.dropProbability = 0; info.serverLoad = ServerLoad::zero(); } size_t getNumAdditionalFields(const CaretMessageInfo& info) { size_t nAdditionalFields = 0; if (info.traceId.first != 0) { ++nAdditionalFields; } if (info.traceId.second != 0) { ++nAdditionalFields; } if (info.supportedCodecsFirstId != 0) { ++nAdditionalFields; } if (info.supportedCodecsSize != 0) { ++nAdditionalFields; } if (info.usedCodecId != 0) { ++nAdditionalFields; } if (info.uncompressedBodySize != 0) { ++nAdditionalFields; } if (info.dropProbability != 0) { ++nAdditionalFields; } if (!info.serverLoad.isZero()) { ++nAdditionalFields; } return nAdditionalFields; } /** * Serialize the additional field and return the number of bytes used to * serialize it. */ size_t serializeAdditionalFieldIfNonZero( uint8_t* destination, CaretAdditionalFieldType type, uint64_t value) { uint8_t* buf = destination; if (value > 0) { buf += folly::encodeVarint(static_cast<uint64_t>(type), buf); buf += folly::encodeVarint(value, buf); } return buf - destination; } size_t serializeAdditionalFields( uint8_t* destination, const CaretMessageInfo& info) { uint8_t* buf = destination; buf += serializeAdditionalFieldIfNonZero( buf, CaretAdditionalFieldType::TRACE_ID, info.traceId.first); buf += serializeAdditionalFieldIfNonZero( buf, CaretAdditionalFieldType::TRACE_NODE_ID, info.traceId.second); buf += serializeAdditionalFieldIfNonZero( buf, CaretAdditionalFieldType::SUPPORTED_CODECS_FIRST_ID, info.supportedCodecsFirstId); buf += serializeAdditionalFieldIfNonZero( buf, CaretAdditionalFieldType::SUPPORTED_CODECS_SIZE, info.supportedCodecsSize); buf += serializeAdditionalFieldIfNonZero( buf, CaretAdditionalFieldType::USED_CODEC_ID, info.usedCodecId); buf += serializeAdditionalFieldIfNonZero( buf, CaretAdditionalFieldType::UNCOMPRESSED_BODY_SIZE, info.uncompressedBodySize); buf += serializeAdditionalFieldIfNonZero( buf, CaretAdditionalFieldType::DROP_PROBABILITY, info.dropProbability); buf += serializeAdditionalFieldIfNonZero( buf, CaretAdditionalFieldType::SERVER_LOAD, info.serverLoad.raw()); return buf - destination; } } // anonymous namespace ParseStatus caretParseHeader( const uint8_t* buff, size_t nbuf, CaretMessageInfo& headerInfo) { /* we need the magic byte and the first byte of encoded header to determine if we have enough data in the buffer to get the entire header */ if (nbuf < 2) { return ParseStatus::NotEnoughData; } if (buff[0] != kCaretMagicByte) { return ParseStatus::MessageParseError; } const char* buf = reinterpret_cast<const char*>(buff); size_t encodedLength = folly::GroupVarint32::encodedSize(buf + 1); if (nbuf < encodedLength + 1) { return ParseStatus::NotEnoughData; } uint32_t additionalFields; folly::GroupVarint32::decode_simple( buf + 1, &headerInfo.bodySize, &headerInfo.typeId, &headerInfo.reqId, &additionalFields); folly::StringPiece range(buf, nbuf); range.advance(encodedLength + 1); // Additional fields are sequence of (key,value) pairs resetAdditionalFields(headerInfo); for (uint32_t i = 0; i < additionalFields; i++) { size_t fieldType; if (auto maybeFieldType = folly::tryDecodeVarint(range)) { fieldType = *maybeFieldType; } else { return ParseStatus::NotEnoughData; } size_t fieldValue; if (auto maybeFieldValue = folly::tryDecodeVarint(range)) { fieldValue = *maybeFieldValue; } else { return ParseStatus::NotEnoughData; } if (fieldType > static_cast<uint64_t>(CaretAdditionalFieldType::SERVER_LOAD)) { // Additional Field Type not recognized, ignore. continue; } switch (static_cast<CaretAdditionalFieldType>(fieldType)) { case CaretAdditionalFieldType::TRACE_ID: headerInfo.traceId.first = fieldValue; break; case CaretAdditionalFieldType::TRACE_NODE_ID: headerInfo.traceId.second = fieldValue; break; case CaretAdditionalFieldType::SUPPORTED_CODECS_FIRST_ID: headerInfo.supportedCodecsFirstId = fieldValue; break; case CaretAdditionalFieldType::SUPPORTED_CODECS_SIZE: headerInfo.supportedCodecsSize = fieldValue; break; case CaretAdditionalFieldType::USED_CODEC_ID: headerInfo.usedCodecId = fieldValue; break; case CaretAdditionalFieldType::UNCOMPRESSED_BODY_SIZE: headerInfo.uncompressedBodySize = fieldValue; break; case CaretAdditionalFieldType::DROP_PROBABILITY: headerInfo.dropProbability = fieldValue; break; case CaretAdditionalFieldType::SERVER_LOAD: headerInfo.serverLoad = ServerLoad(fieldValue); break; } } headerInfo.headerSize = range.cbegin() - buf; return ParseStatus::Ok; } size_t caretPrepareHeader(const CaretMessageInfo& info, char* headerBuf) { // Header is at most kMaxHeaderLength without extra fields. uint32_t bodySize = info.bodySize; uint32_t typeId = info.typeId; uint32_t reqId = info.reqId; headerBuf[0] = kCaretMagicByte; // Header char* additionalFields = folly::GroupVarint32::encode( headerBuf + 1, bodySize, typeId, reqId, getNumAdditionalFields(info)); // Additional fields additionalFields += serializeAdditionalFields( reinterpret_cast<uint8_t*>(additionalFields), info); return additionalFields - headerBuf; } } // namespace memcache } // namespace facebook