in pulsar/consumer_partition.go [1173:1428]
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
pbMsgID := response.GetMessageId()
reader := internal.NewMessageReader(headersAndPayload)
brokerMetadata, err := reader.ReadBrokerMetadata()
if err != nil {
// todo optimize use more appropriate error codes
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
return err
}
msgMeta, err := reader.ReadMessageMetadata()
if err != nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_ChecksumMismatch)
return err
}
decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
// error decrypting the payload
if err != nil {
// default crypto failure action
cryptoFailureAction := crypto.ConsumerCryptoFailureActionFail
if pc.options.decryption != nil {
cryptoFailureAction = pc.options.decryption.ConsumerCryptoFailureAction
}
switch cryptoFailureAction {
case crypto.ConsumerCryptoFailureActionFail:
pc.log.Errorf("consuming message failed due to decryption err: %v", err)
pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, 0, nil))
return err
case crypto.ConsumerCryptoFailureActionDiscard:
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
return fmt.Errorf("discarding message on decryption error: %w", err)
case crypto.ConsumerCryptoFailureActionConsume:
pc.log.Warnf("consuming encrypted message due to error in decryption: %v", err)
messages := []*message{
{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
topic: pc.topic,
msgID: newMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
pbMsgID.GetBatchIndex(),
pc.partitionIdx,
pbMsgID.GetBatchSize(),
),
payLoad: headersAndPayload.ReadableSlice(),
schema: pc.options.schema,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
encryptionContext: createEncryptionContext(msgMeta),
orderingKey: string(msgMeta.OrderingKey),
},
}
if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Inc()
pc.markScaleIfNeed()
}
pc.queueCh <- messages
return nil
}
}
isChunkedMsg := false
if msgMeta.GetNumChunksFromMsg() > 1 {
isChunkedMsg = true
}
processedPayloadBuffer := internal.NewBufferWrapper(decryptedPayload)
if isChunkedMsg {
processedPayloadBuffer = pc.processMessageChunk(processedPayloadBuffer, msgMeta, pbMsgID)
if processedPayloadBuffer == nil {
return nil
}
}
var uncompressedHeadersAndPayload internal.Buffer
// decryption is success, decompress the payload, but only if payload is not empty
if n := msgMeta.UncompressedSize; n != nil && *n > 0 {
uncompressedHeadersAndPayload, err = pc.Decompress(msgMeta, processedPayloadBuffer)
if err != nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
return err
}
// Reset the reader on the uncompressed buffer
reader.ResetBuffer(uncompressedHeadersAndPayload)
}
numMsgs := 1
if msgMeta.NumMessagesInBatch != nil {
numMsgs = int(msgMeta.GetNumMessagesInBatch())
}
messages := make([]*message, 0)
var ackTracker *ackTracker
// are there multiple messages in this batch?
if numMsgs > 1 {
ackTracker = newAckTracker(uint(numMsgs))
}
var ackSet *bitset.BitSet
if response.GetAckSet() != nil {
ackSetFromResponse := response.GetAckSet()
buf := make([]uint64, len(ackSetFromResponse))
for i := 0; i < len(buf); i++ {
buf[i] = uint64(ackSetFromResponse[i])
}
ackSet = bitset.From(buf)
}
pc.metrics.MessagesReceived.Add(float64(numMsgs))
pc.metrics.PrefetchedMessages.Add(float64(numMsgs))
var (
bytesReceived int
skippedMessages int32
)
for i := 0; i < numMsgs; i++ {
smm, payload, err := reader.ReadMessage()
if err != nil || payload == nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
return err
}
if ackSet != nil && !ackSet.Test(uint(i)) {
pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i)
skippedMessages++
continue
}
pc.metrics.BytesReceived.Add(float64(len(payload)))
pc.metrics.PrefetchedBytes.Add(float64(len(payload)))
trackingMsgID := newTrackingMessageID(
int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()),
int32(i),
pc.partitionIdx,
int32(numMsgs),
ackTracker)
// set the consumer so we know how to ack the message id
trackingMsgID.consumer = pc
if pc.messageShouldBeDiscarded(trackingMsgID) {
pc.AckID(trackingMsgID)
skippedMessages++
continue
}
var msgID MessageID
if isChunkedMsg {
ctx := pc.chunkedMsgCtxMap.get(msgMeta.GetUuid())
if ctx == nil {
// chunkedMsgCtxMap has closed because of consumer closed
pc.log.Warnf("get chunkedMsgCtx for chunk with uuid %s failed because consumer has closed",
msgMeta.Uuid)
return nil
}
cmid := newChunkMessageID(ctx.firstChunkID(), ctx.lastChunkID())
// set the consumer so we know how to ack the message id
cmid.consumer = pc
// clean chunkedMsgCtxMap
pc.chunkedMsgCtxMap.remove(msgMeta.GetUuid())
pc.unAckChunksTracker.add(cmid, ctx.chunkedMsgIDs)
msgID = cmid
} else {
msgID = trackingMsgID
}
if pc.ackGroupingTracker.isDuplicate(msgID) {
skippedMessages++
continue
}
var messageIndex *uint64
var brokerPublishTime *time.Time
if brokerMetadata != nil {
if brokerMetadata.Index != nil {
aux := brokerMetadata.GetIndex() - uint64(numMsgs) + uint64(i) + 1
messageIndex = &aux
}
if brokerMetadata.BrokerTimestamp != nil {
aux := timeFromUnixTimestampMillis(*brokerMetadata.BrokerTimestamp)
brokerPublishTime = &aux
}
}
var msg *message
if smm != nil {
msg = &message{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(smm.GetEventTime()),
key: smm.GetPartitionKey(),
producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(smm.GetProperties()),
topic: pc.topic,
msgID: msgID,
payLoad: payload,
schema: pc.options.schema,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
schemaVersion: msgMeta.GetSchemaVersion(),
schemaInfoCache: pc.schemaInfoCache,
orderingKey: string(smm.OrderingKey),
index: messageIndex,
brokerPublishTime: brokerPublishTime,
}
} else {
msg = &message{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
key: msgMeta.GetPartitionKey(),
producerName: msgMeta.GetProducerName(),
properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
topic: pc.topic,
msgID: msgID,
payLoad: payload,
schema: pc.options.schema,
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom: msgMeta.GetReplicatedFrom(),
redeliveryCount: response.GetRedeliveryCount(),
schemaVersion: msgMeta.GetSchemaVersion(),
schemaInfoCache: pc.schemaInfoCache,
orderingKey: string(msgMeta.GetOrderingKey()),
index: messageIndex,
brokerPublishTime: brokerPublishTime,
}
}
pc.options.interceptors.BeforeConsume(ConsumerMessage{
Consumer: pc.parentConsumer,
Message: msg,
})
messages = append(messages, msg)
bytesReceived += msg.size()
if pc.options.autoReceiverQueueSize {
pc.client.memLimit.ForceReserveMemory(int64(bytesReceived))
pc.incomingMessages.Add(int32(1))
pc.markScaleIfNeed()
}
}
if skippedMessages > 0 {
pc.availablePermits.add(skippedMessages)
}
// send messages to the dispatcher
pc.queueCh <- messages
return nil
}