in lib/ConsumerImpl.cc [422:496]
boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx,
MessageId& messageId) {
const auto chunkId = metadata.chunk_id();
const auto uuid = metadata.uuid();
LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid
<< ", messageId: " << messageId << ") of "
<< payload.readableBytes() << " bytes");
Lock lock(chunkProcessMutex_);
// Lazy task scheduling to expire incomplete chunk message
bool expected = false;
if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, true)) {
triggerCheckExpiredChunkedTimer();
}
auto it = chunkedMessageCache_.find(uuid);
if (chunkId == 0 && it == chunkedMessageCache_.end()) {
if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) {
chunkedMessageCache_.removeOldestValues(
chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1,
[this](const std::string& uuid, const ChunkedMessageCtx& ctx) {
for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_);
}
});
}
it = chunkedMessageCache_.putIfAbsent(
uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
}
auto& chunkedMsgCtx = it->second;
if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) {
if (it == chunkedMessageCache_.end()) {
LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId
<< ", messageId: " << messageId << ")");
} else {
LOG_ERROR("Received a chunk whose chunk id is invalid (uuid: "
<< uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")");
chunkedMessageCache_.remove(uuid);
}
lock.unlock();
increaseAvailablePermits(cnx);
trackMessage(messageId);
return boost::none;
}
chunkedMsgCtx.appendChunk(messageId, payload);
if (!chunkedMsgCtx.isCompleted()) {
lock.unlock();
increaseAvailablePermits(cnx);
return boost::none;
}
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
messageId = chunkMsgId->build();
LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
<< ", sequenceId: " << metadata.sequence_id());
auto wholePayload = chunkedMsgCtx.getBuffer();
chunkedMessageCache_.remove(uuid);
if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) {
return wholePayload;
} else {
return boost::none;
}
}