in pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java [1378:1525]
void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, ClientCnx cnx) {
List<Long> ackSet = Collections.emptyList();
if (cmdMessage.getAckSetsCount() > 0) {
ackSet = new ArrayList<>(cmdMessage.getAckSetsCount());
for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
ackSet.add(cmdMessage.getAckSetAt(i));
}
}
int redeliveryCount = cmdMessage.getRedeliveryCount();
MessageIdData messageId = cmdMessage.getMessageId();
long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
// if broker send messages to client with consumerEpoch, we should set consumerEpoch to message
if (cmdMessage.hasConsumerEpoch()) {
consumerEpoch = cmdMessage.getConsumerEpoch();
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(),
messageId.getEntryId());
}
if (!verifyChecksum(headersAndPayload, messageId)) {
// discard message with checksum error
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
BrokerEntryMetadata brokerEntryMetadata;
MessageMetadata msgMetadata;
try {
brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayload);
msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
} catch (Throwable t) {
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
final int numMessages = msgMetadata.getNumMessagesInBatch();
final int numChunks = msgMetadata.hasNumChunksFromMsg() ? msgMetadata.getNumChunksFromMsg() : 0;
final boolean isChunkedMessage = numChunks > 1;
MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()
&& acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
topic, subscription, consumerName, msgId);
}
increaseAvailablePermits(cnx, numMessages);
return;
}
ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, redeliveryCount, msgMetadata, headersAndPayload,
cnx);
boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
if (decryptedPayload == null) {
// Message was discarded or CryptoKeyReader isn't implemented
return;
}
// uncompress decryptedPayload and release decryptedPayload-ByteBuf
ByteBuf uncompressedPayload = (isMessageUndecryptable || isChunkedMessage) ? decryptedPayload.retain()
: uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx, true);
decryptedPayload.release();
if (uncompressedPayload == null) {
// Message was discarded on decompression error
return;
}
if (conf.getPayloadProcessor() != null) {
// uncompressedPayload is released in this method so we don't need to call release() again
processPayloadByProcessor(brokerEntryMetadata, msgMetadata,
uncompressedPayload, msgId, schema, redeliveryCount, ackSet, consumerEpoch);
return;
}
// if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
// right now, chunked messages are only supported by non-shared subscription
if (isChunkedMessage) {
uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx);
if (uncompressedPayload == null) {
return;
}
// last chunk received: so, stitch chunked-messages and clear up chunkedMsgBuffer
if (log.isDebugEnabled()) {
log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}",
msgMetadata.getChunkId(), msgMetadata.getNumChunksFromMsg(), msgId,
msgMetadata.getSequenceId());
}
// remove buffer from the map, set the chunk message id
ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.remove(msgMetadata.getUuid());
if (chunkedMsgCtx.chunkedMessageIds.length > 0) {
msgId = new ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0],
chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]);
}
// add chunked messageId to unack-message tracker, and reduce pending-chunked-message count
unAckedChunkedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
pendingChunkedMessageCount--;
chunkedMsgCtx.recycle();
}
// If the topic is non-persistent, we should not ignore any messages.
if (this.topicName.isPersistent() && isSameEntry(msgId) && isPriorEntryIndex(messageId.getEntryId())) {
// We need to discard entries that were prior to startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
consumerName, startMessageId);
}
uncompressedPayload.release();
return;
}
final MessageImpl<T> message =
newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload,
schema, redeliveryCount, consumerEpoch);
uncompressedPayload.release();
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
Collections.singletonList(message));
if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId()));
// The message is skipped due to reaching the max redelivery count,
// so we need to increase the available permits
increaseAvailablePermits(cnx);
return;
}
}
}
executeNotifyCallback(message);
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
receiveIndividualMessagesFromBatch(brokerEntryMetadata, msgMetadata, redeliveryCount, ackSet,
uncompressedPayload, messageId, cnx, consumerEpoch);
uncompressedPayload.release();
}
tryTriggerListener();
}