in lib/ConsumerImpl.cc [457:536]
boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx,
MessageId& messageId) {
const auto chunkId = metadata.chunk_id();
const auto uuid = metadata.uuid();
LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
<< ", messageId: " << messageId << ") of "
<< payload.readableBytes() << " bytes");
Lock lock(chunkProcessMutex_);
// Lazy task scheduling to expire incomplete chunk message
bool expected = false;
if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) {
triggerCheckExpiredChunkedTimer();
}
auto it = chunkedMessageCache_.find(uuid);
if (chunkId == 0 && it == chunkedMessageCache_.end()) {
if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) {
chunkedMessageCache_.removeOldestValues(
chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1,
[this](const std::string& uuid, const ChunkedMessageCtx& ctx) {
for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_);
}
});
}
it = chunkedMessageCache_.putIfAbsent(
uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
}
auto& chunkedMsgCtx = it->second;
if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) {
auto startMessageId = startMessageId_.get().value_or(MessageId::earliest());
if (!config_.isStartMessageIdInclusive() && startMessageId.ledgerId() == messageId.ledgerId() &&
startMessageId.entryId() == messageId.entryId()) {
// When the start message id is not inclusive, the last chunk of the previous chunked message will
// be delivered, which is expected and we only need to filter it out.
chunkedMessageCache_.remove(uuid);
LOG_INFO("Filtered the chunked message before the start message id (uuid: "
<< uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")");
} else if (it == chunkedMessageCache_.end()) {
LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId
<< ", messageId: " << messageId << ")");
} else {
LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: "
<< uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")");
chunkedMessageCache_.remove(uuid);
}
lock.unlock();
increaseAvailablePermits(cnx);
trackMessage(messageId);
return boost::none;
}
chunkedMsgCtx.appendChunk(messageId, payload);
if (!chunkedMsgCtx.isCompleted()) {
lock.unlock();
increaseAvailablePermits(cnx);
return boost::none;
}
messageId = std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds())->build();
LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
<< ", sequenceId: " << metadata.sequence_id());
auto wholePayload = chunkedMsgCtx.getBuffer();
chunkedMessageCache_.remove(uuid);
if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
return wholePayload;
} else {
return boost::none;
}
}