func()

in pulsar/consumer_partition.go [1173:1428]


func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
	pbMsgID := response.GetMessageId()

	reader := internal.NewMessageReader(headersAndPayload)
	brokerMetadata, err := reader.ReadBrokerMetadata()
	if err != nil {
		// todo optimize use more appropriate error codes
		pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
		return err
	}
	msgMeta, err := reader.ReadMessageMetadata()
	if err != nil {
		pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_ChecksumMismatch)
		return err
	}
	decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
	// error decrypting the payload
	if err != nil {
		// default crypto failure action
		cryptoFailureAction := crypto.ConsumerCryptoFailureActionFail
		if pc.options.decryption != nil {
			cryptoFailureAction = pc.options.decryption.ConsumerCryptoFailureAction
		}

		switch cryptoFailureAction {
		case crypto.ConsumerCryptoFailureActionFail:
			pc.log.Errorf("consuming message failed due to decryption err: %v", err)
			pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, 0, nil))
			return err
		case crypto.ConsumerCryptoFailureActionDiscard:
			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
			return fmt.Errorf("discarding message on decryption error: %w", err)
		case crypto.ConsumerCryptoFailureActionConsume:
			pc.log.Warnf("consuming encrypted message due to error in decryption: %v", err)
			messages := []*message{
				{
					publishTime:  timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
					eventTime:    timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
					key:          msgMeta.GetPartitionKey(),
					producerName: msgMeta.GetProducerName(),
					properties:   internal.ConvertToStringMap(msgMeta.GetProperties()),
					topic:        pc.topic,
					msgID: newMessageID(
						int64(pbMsgID.GetLedgerId()),
						int64(pbMsgID.GetEntryId()),
						pbMsgID.GetBatchIndex(),
						pc.partitionIdx,
						pbMsgID.GetBatchSize(),
					),
					payLoad:             headersAndPayload.ReadableSlice(),
					schema:              pc.options.schema,
					replicationClusters: msgMeta.GetReplicateTo(),
					replicatedFrom:      msgMeta.GetReplicatedFrom(),
					redeliveryCount:     response.GetRedeliveryCount(),
					encryptionContext:   createEncryptionContext(msgMeta),
					orderingKey:         string(msgMeta.OrderingKey),
				},
			}
			if pc.options.autoReceiverQueueSize {
				pc.incomingMessages.Inc()
				pc.markScaleIfNeed()
			}

			pc.queueCh <- messages
			return nil
		}
	}

	isChunkedMsg := false
	if msgMeta.GetNumChunksFromMsg() > 1 {
		isChunkedMsg = true
	}

	processedPayloadBuffer := internal.NewBufferWrapper(decryptedPayload)
	if isChunkedMsg {
		processedPayloadBuffer = pc.processMessageChunk(processedPayloadBuffer, msgMeta, pbMsgID)
		if processedPayloadBuffer == nil {
			return nil
		}
	}

	var uncompressedHeadersAndPayload internal.Buffer
	// decryption is success, decompress the payload, but only if payload is not empty
	if n := msgMeta.UncompressedSize; n != nil && *n > 0 {
		uncompressedHeadersAndPayload, err = pc.Decompress(msgMeta, processedPayloadBuffer)
		if err != nil {
			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
			return err
		}

		// Reset the reader on the uncompressed buffer
		reader.ResetBuffer(uncompressedHeadersAndPayload)
	}

	numMsgs := 1
	if msgMeta.NumMessagesInBatch != nil {
		numMsgs = int(msgMeta.GetNumMessagesInBatch())
	}

	messages := make([]*message, 0)
	var ackTracker *ackTracker
	// are there multiple messages in this batch?
	if numMsgs > 1 {
		ackTracker = newAckTracker(uint(numMsgs))
	}

	var ackSet *bitset.BitSet
	if response.GetAckSet() != nil {
		ackSetFromResponse := response.GetAckSet()
		buf := make([]uint64, len(ackSetFromResponse))
		for i := 0; i < len(buf); i++ {
			buf[i] = uint64(ackSetFromResponse[i])
		}
		ackSet = bitset.From(buf)
	}

	pc.metrics.MessagesReceived.Add(float64(numMsgs))
	pc.metrics.PrefetchedMessages.Add(float64(numMsgs))

	var (
		bytesReceived   int
		skippedMessages int32
	)
	for i := 0; i < numMsgs; i++ {
		smm, payload, err := reader.ReadMessage()
		if err != nil || payload == nil {
			pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
			return err
		}
		if ackSet != nil && !ackSet.Test(uint(i)) {
			pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i)
			skippedMessages++
			continue
		}

		pc.metrics.BytesReceived.Add(float64(len(payload)))
		pc.metrics.PrefetchedBytes.Add(float64(len(payload)))

		trackingMsgID := newTrackingMessageID(
			int64(pbMsgID.GetLedgerId()),
			int64(pbMsgID.GetEntryId()),
			int32(i),
			pc.partitionIdx,
			int32(numMsgs),
			ackTracker)
		// set the consumer so we know how to ack the message id
		trackingMsgID.consumer = pc

		if pc.messageShouldBeDiscarded(trackingMsgID) {
			pc.AckID(trackingMsgID)
			skippedMessages++
			continue
		}

		var msgID MessageID
		if isChunkedMsg {
			ctx := pc.chunkedMsgCtxMap.get(msgMeta.GetUuid())
			if ctx == nil {
				// chunkedMsgCtxMap has closed because of consumer closed
				pc.log.Warnf("get chunkedMsgCtx for chunk with uuid %s failed because consumer has closed",
					msgMeta.Uuid)
				return nil
			}
			cmid := newChunkMessageID(ctx.firstChunkID(), ctx.lastChunkID())
			// set the consumer so we know how to ack the message id
			cmid.consumer = pc
			// clean chunkedMsgCtxMap
			pc.chunkedMsgCtxMap.remove(msgMeta.GetUuid())
			pc.unAckChunksTracker.add(cmid, ctx.chunkedMsgIDs)
			msgID = cmid
		} else {
			msgID = trackingMsgID
		}

		if pc.ackGroupingTracker.isDuplicate(msgID) {
			skippedMessages++
			continue
		}

		var messageIndex *uint64
		var brokerPublishTime *time.Time
		if brokerMetadata != nil {
			if brokerMetadata.Index != nil {
				aux := brokerMetadata.GetIndex() - uint64(numMsgs) + uint64(i) + 1
				messageIndex = &aux
			}
			if brokerMetadata.BrokerTimestamp != nil {
				aux := timeFromUnixTimestampMillis(*brokerMetadata.BrokerTimestamp)
				brokerPublishTime = &aux
			}
		}

		var msg *message
		if smm != nil {
			msg = &message{
				publishTime:         timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
				eventTime:           timeFromUnixTimestampMillis(smm.GetEventTime()),
				key:                 smm.GetPartitionKey(),
				producerName:        msgMeta.GetProducerName(),
				properties:          internal.ConvertToStringMap(smm.GetProperties()),
				topic:               pc.topic,
				msgID:               msgID,
				payLoad:             payload,
				schema:              pc.options.schema,
				replicationClusters: msgMeta.GetReplicateTo(),
				replicatedFrom:      msgMeta.GetReplicatedFrom(),
				redeliveryCount:     response.GetRedeliveryCount(),
				schemaVersion:       msgMeta.GetSchemaVersion(),
				schemaInfoCache:     pc.schemaInfoCache,
				orderingKey:         string(smm.OrderingKey),
				index:               messageIndex,
				brokerPublishTime:   brokerPublishTime,
			}
		} else {
			msg = &message{
				publishTime:         timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
				eventTime:           timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
				key:                 msgMeta.GetPartitionKey(),
				producerName:        msgMeta.GetProducerName(),
				properties:          internal.ConvertToStringMap(msgMeta.GetProperties()),
				topic:               pc.topic,
				msgID:               msgID,
				payLoad:             payload,
				schema:              pc.options.schema,
				replicationClusters: msgMeta.GetReplicateTo(),
				replicatedFrom:      msgMeta.GetReplicatedFrom(),
				redeliveryCount:     response.GetRedeliveryCount(),
				schemaVersion:       msgMeta.GetSchemaVersion(),
				schemaInfoCache:     pc.schemaInfoCache,
				orderingKey:         string(msgMeta.GetOrderingKey()),
				index:               messageIndex,
				brokerPublishTime:   brokerPublishTime,
			}
		}

		pc.options.interceptors.BeforeConsume(ConsumerMessage{
			Consumer: pc.parentConsumer,
			Message:  msg,
		})
		messages = append(messages, msg)
		bytesReceived += msg.size()
		if pc.options.autoReceiverQueueSize {
			pc.client.memLimit.ForceReserveMemory(int64(bytesReceived))
			pc.incomingMessages.Add(int32(1))
			pc.markScaleIfNeed()
		}
	}

	if skippedMessages > 0 {
		pc.availablePermits.add(skippedMessages)
	}

	// send messages to the dispatcher
	pc.queueCh <- messages
	return nil
}