in thrift/lib/cpp/transport/THeader.cpp [823:986]
unique_ptr<IOBuf> THeader::addHeader(
unique_ptr<IOBuf> buf,
StringToStringMap& persistentWriteHeaders,
bool transform) {
// We may need to modify some transforms before send. Make
// a copy here
std::vector<uint16_t> writeTrans = writeTrans_;
if (clientType_ == THRIFT_HEADER_CLIENT_TYPE) {
if (transform) {
buf = THeader::transform(std::move(buf), writeTrans);
}
}
size_t chainSize = buf->computeChainDataLength();
if (protoId_ == T_JSON_PROTOCOL && clientType_ != THRIFT_HTTP_SERVER_TYPE) {
throw TTransportException(
TTransportException::BAD_ARGS, "Trying to send JSON without HTTP");
}
if (chainSize > MAX_FRAME_SIZE && clientType_ != THRIFT_HEADER_CLIENT_TYPE) {
throw TTransportException(
TTransportException::INVALID_FRAME_SIZE,
"Attempting to send non-header frame that is too large");
}
// Add in special flags
// All flags must be added before any calls to getMaxWriteHeadersSize
if (identity_.length() > 0) {
ensureWriteHeaders();
(*writeHeaders_)[IDENTITY_HEADER] = identity_;
(*writeHeaders_)[ID_VERSION_HEADER] = ID_VERSION;
}
if (clientType_ == THRIFT_HEADER_CLIENT_TYPE) {
// header size will need to be updated at the end because of varints.
// Make it big enough here for max varint size, plus 4 for padding.
int headerSize =
(2 + getNumTransforms(writeTrans) * 2 /* transform data */) * 5 + 4;
// add approximate size of info headers
headerSize += getMaxWriteHeadersSize(persistentWriteHeaders);
// Pkt size
unique_ptr<IOBuf> header = IOBuf::create(22 + headerSize);
// 8 bytes of headroom, we'll use them if we go over MAX_FRAME_SIZE
header->advance(8);
uint8_t* pkt = header->writableData();
uint8_t* headerStart;
uint8_t* headerSizePtr;
uint8_t* pktStart = pkt;
size_t szHbo;
uint32_t szNbo;
uint16_t headerSizeN;
// Fixup szNbo later
pkt += sizeof(szNbo);
uint16_t magicN = folly::Endian::big<uint16_t>(HEADER_MAGIC >> 16);
memcpy(pkt, &magicN, sizeof(magicN));
pkt += sizeof(magicN);
uint16_t flagsN = folly::Endian::big(flags_);
memcpy(pkt, &flagsN, sizeof(flagsN));
pkt += sizeof(flagsN);
uint32_t seqIdN = folly::Endian::big(seqId_);
memcpy(pkt, &seqIdN, sizeof(seqIdN));
pkt += sizeof(seqIdN);
headerSizePtr = pkt;
// Fixup headerSizeN later
pkt += sizeof(headerSizeN);
headerStart = pkt;
pkt += writeVarint32(protoId_, pkt);
pkt += writeVarint32(getNumTransforms(writeTrans), pkt);
for (auto& transId : writeTrans) {
pkt += writeVarint32(transId, pkt);
}
// write info headers
// write persistent kv-headers
flushInfoHeaders(pkt, persistentWriteHeaders, infoIdType::PKEYVALUE);
// write non-persistent kv-headers
if (writeHeaders_) {
flushInfoHeaders(pkt, *writeHeaders_, infoIdType::KEYVALUE);
}
if (extraWriteHeaders_) {
flushInfoHeaders(pkt, *extraWriteHeaders_, infoIdType::KEYVALUE, false);
}
// TODO(davejwatson) optimize this for writing twice/memcopy to pkt buffer.
// See code in TBufferTransports
// Fixups after varint size calculations
headerSize = (pkt - headerStart);
uint8_t padding = 4 - (headerSize % 4);
headerSize += padding;
// Pad out pkt with 0x00
for (int i = 0; i < padding; i++) {
*(pkt++) = 0x00;
}
assert(pkt - pktStart <= static_cast<ptrdiff_t>(header->capacity()));
// Pkt size
szHbo = headerSize + chainSize // thrift header + payload
+ (headerStart - pktStart - 4); // common header section
headerSizeN = folly::Endian::big<uint16_t>(headerSize / 4);
memcpy(headerSizePtr, &headerSizeN, sizeof(headerSizeN));
// Set framing size.
if (szHbo > MAX_FRAME_SIZE) {
if (!allowBigFrames_) {
throw TTransportException(
TTransportException::INVALID_FRAME_SIZE, "Big frames not allowed");
}
header->prepend(8);
pktStart -= 8;
szNbo = folly::Endian::big(BIG_FRAME_MAGIC);
memcpy(pktStart, &szNbo, sizeof(szNbo));
uint64_t s = folly::Endian::big<uint64_t>(szHbo);
memcpy(pktStart + 4, &s, sizeof(s));
} else {
szNbo = folly::Endian::big<uint32_t>(szHbo);
memcpy(pktStart, &szNbo, sizeof(szNbo));
}
header->append(szHbo - chainSize + 4);
header->prependChain(std::move(buf));
buf = std::move(header);
} else if (
(clientType_ == THRIFT_FRAMED_DEPRECATED) ||
(clientType_ == THRIFT_FRAMED_COMPACT)) {
uint32_t szHbo = (uint32_t)chainSize;
uint32_t szNbo = folly::Endian::big<uint32_t>(szHbo);
unique_ptr<IOBuf> header = IOBuf::create(4);
header->append(4);
memcpy(header->writableData(), &szNbo, 4);
header->prependChain(std::move(buf));
buf = std::move(header);
} else if (
clientType_ == THRIFT_UNFRAMED_DEPRECATED ||
clientType_ == THRIFT_UNFRAMED_COMPACT_DEPRECATED ||
clientType_ == THRIFT_HTTP_SERVER_TYPE) {
// We just return buf
// TODO: IOBufize httpTransport.
} else if (clientType_ == THRIFT_HTTP_CLIENT_TYPE) {
CHECK(httpClientParser_.get() != nullptr);
buf = httpClientParser_->constructHeader(
std::move(buf),
persistentWriteHeaders,
writeHeaders_ ? *writeHeaders_ : kEmptyMap(),
extraWriteHeaders_);
writeHeaders_.reset();
} else {
throw TTransportException(
TTransportException::BAD_ARGS, "Unknown client type");
}
return buf;
}