in lib/ClientConnection.cc [633:753]
void ClientConnection::processIncomingBuffer() {
// Process all the available frames from the incoming buffer
while (incomingBuffer_.readableBytes() >= sizeof(uint32_t)) {
// Extract message frames from incoming buffer
// At this point we have at least 4 bytes in the buffer
uint32_t frameSize = incomingBuffer_.readUnsignedInt();
if (frameSize > incomingBuffer_.readableBytes()) {
// We don't have the entire frame yet
const uint32_t bytesToReceive = frameSize - incomingBuffer_.readableBytes();
// Rollback the reading of frameSize (when the frame will be complete,
// we'll read it again
incomingBuffer_.rollback(sizeof(uint32_t));
if (bytesToReceive <= incomingBuffer_.writableBytes()) {
// The rest of the frame still fits in the current buffer
asyncReceive(incomingBuffer_.asio_buffer(),
customAllocReadHandler(std::bind(&ClientConnection::handleRead,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2, bytesToReceive)));
return;
} else {
// Need to allocate a buffer big enough for the frame
uint32_t newBufferSize = std::max<uint32_t>(DefaultBufferSize, frameSize + sizeof(uint32_t));
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, newBufferSize);
asyncReceive(incomingBuffer_.asio_buffer(),
customAllocReadHandler(std::bind(&ClientConnection::handleRead,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2, bytesToReceive)));
return;
}
}
// At this point, we have at least one complete frame available in the buffer
uint32_t cmdSize = incomingBuffer_.readUnsignedInt();
proto::BaseCommand incomingCmd;
if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
LOG_ERROR(cnxString_ << "Error parsing protocol buffer command");
close();
return;
}
incomingBuffer_.consume(cmdSize);
if (incomingCmd.type() == BaseCommand::MESSAGE) {
// Parse message metadata and extract payload
proto::MessageMetadata msgMetadata;
proto::BrokerEntryMetadata brokerEntryMetadata;
// read checksum
uint32_t remainingBytes = frameSize - (cmdSize + 4);
auto readerIndex = incomingBuffer_.readerIndex();
if (incomingBuffer_.readUnsignedShort() == Commands::magicBrokerEntryMetadata) {
// broker entry metadata is present
uint32_t brokerEntryMetadataSize = incomingBuffer_.readUnsignedInt();
if (!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), brokerEntryMetadataSize)) {
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id()
<< ", message ledger id "
<< incomingCmd.message().message_id().ledgerid() << ", entry id "
<< incomingCmd.message().message_id().entryid()
<< "] Error parsing broker entry metadata");
close();
return;
}
incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + brokerEntryMetadataSize);
remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
} else {
incomingBuffer_.setReaderIndex(readerIndex);
}
bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd);
uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id() //
<< ", message ledger id "
<< incomingCmd.message().message_id().ledgerid() //
<< ", entry id " << incomingCmd.message().message_id().entryid()
<< "] Error parsing message metadata");
close();
return;
}
incomingBuffer_.consume(metadataSize);
remainingBytes -= (4 + metadataSize);
uint32_t payloadSize = remainingBytes;
SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize);
incomingBuffer_.consume(payloadSize);
handleIncomingMessage(incomingCmd.message(), isChecksumValid, brokerEntryMetadata, msgMetadata,
payload);
} else {
handleIncomingCommand(incomingCmd);
}
}
if (incomingBuffer_.readableBytes() > 0) {
// We still have 1 to 3 bytes from the next frame
assert(incomingBuffer_.readableBytes() < sizeof(uint32_t));
// Restart with a new buffer and copy the few bytes at the beginning
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, DefaultBufferSize);
// At least we need to read 4 bytes to have the complete frame size
uint32_t minReadSize = sizeof(uint32_t) - incomingBuffer_.readableBytes();
asyncReceive(
incomingBuffer_.asio_buffer(),
customAllocReadHandler(std::bind(&ClientConnection::handleRead, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, minReadSize)));
return;
}
// We have read everything we had in the buffer
// Rollback the indexes to reuse the same buffer
incomingBuffer_.reset();
readNextCommand();
}