func()

in pulsar/consumer_partition.go [1224:1271]


func (pc *partitionConsumer) processMessageChunk(compressedPayload internal.Buffer,
	msgMeta *pb.MessageMetadata,
	pbMsgID *pb.MessageIdData) internal.Buffer {
	uuid := msgMeta.GetUuid()
	numChunks := msgMeta.GetNumChunksFromMsg()
	totalChunksSize := int(msgMeta.GetTotalChunkMsgSize())
	chunkID := msgMeta.GetChunkId()
	msgID := &messageID{
		ledgerID:     int64(pbMsgID.GetLedgerId()),
		entryID:      int64(pbMsgID.GetEntryId()),
		batchIdx:     -1,
		partitionIdx: pc.partitionIdx,
	}

	if msgMeta.GetChunkId() == 0 {
		pc.chunkedMsgCtxMap.addIfAbsent(uuid,
			numChunks,
			totalChunksSize,
		)
	}

	ctx := pc.chunkedMsgCtxMap.get(uuid)

	if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != ctx.lastChunkedMsgID+1 {
		lastChunkedMsgID := -1
		totalChunks := -1
		if ctx != nil {
			lastChunkedMsgID = int(ctx.lastChunkedMsgID)
			totalChunks = int(ctx.totalChunks)
			ctx.chunkedMsgBuffer.Clear()
		}
		pc.log.Warnf(fmt.Sprintf(
			"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
			msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
		pc.chunkedMsgCtxMap.remove(uuid)
		pc.availablePermits.inc()
		return nil
	}

	ctx.append(chunkID, msgID, compressedPayload)

	if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
		pc.availablePermits.inc()
		return nil
	}

	return ctx.chunkedMsgBuffer
}