in pulsar/consumer_partition.go [1224:1271]
func (pc *partitionConsumer) processMessageChunk(compressedPayload internal.Buffer,
msgMeta *pb.MessageMetadata,
pbMsgID *pb.MessageIdData) internal.Buffer {
uuid := msgMeta.GetUuid()
numChunks := msgMeta.GetNumChunksFromMsg()
totalChunksSize := int(msgMeta.GetTotalChunkMsgSize())
chunkID := msgMeta.GetChunkId()
msgID := &messageID{
ledgerID: int64(pbMsgID.GetLedgerId()),
entryID: int64(pbMsgID.GetEntryId()),
batchIdx: -1,
partitionIdx: pc.partitionIdx,
}
if msgMeta.GetChunkId() == 0 {
pc.chunkedMsgCtxMap.addIfAbsent(uuid,
numChunks,
totalChunksSize,
)
}
ctx := pc.chunkedMsgCtxMap.get(uuid)
if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 {
lastChunkedMsgID := -1
totalChunks := -1
if ctx != nil {
lastChunkedMsgID = int(ctx.lastChunkedMsgID)
totalChunks = int(ctx.totalChunks)
ctx.chunkedMsgBuffer.Clear()
}
pc.log.Warnf(fmt.Sprintf(
"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
pc.chunkedMsgCtxMap.remove(uuid)
pc.availablePermits.inc()
return nil
}
ctx.append(chunkID, msgID, compressedPayload)
if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
pc.availablePermits.inc()
return nil
}
return ctx.chunkedMsgBuffer
}