void messageReceived()

in pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java [1378:1525]


    void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, ClientCnx cnx) {
        List<Long> ackSet = Collections.emptyList();
        if (cmdMessage.getAckSetsCount() > 0) {
            ackSet = new ArrayList<>(cmdMessage.getAckSetsCount());
            for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
                ackSet.add(cmdMessage.getAckSetAt(i));
            }
        }
        int redeliveryCount = cmdMessage.getRedeliveryCount();
        MessageIdData messageId = cmdMessage.getMessageId();
        long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
        // if broker send messages to client with consumerEpoch, we should set consumerEpoch to message
        if (cmdMessage.hasConsumerEpoch()) {
            consumerEpoch = cmdMessage.getConsumerEpoch();
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(),
                    messageId.getEntryId());
        }

        if (!verifyChecksum(headersAndPayload, messageId)) {
            // discard message with checksum error
            discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
            return;
        }

        BrokerEntryMetadata brokerEntryMetadata;
        MessageMetadata msgMetadata;
        try {
            brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayload);
            msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
        } catch (Throwable t) {
            discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
            return;
        }

        final int numMessages = msgMetadata.getNumMessagesInBatch();
        final int numChunks = msgMetadata.hasNumChunksFromMsg() ? msgMetadata.getNumChunksFromMsg() : 0;
        final boolean isChunkedMessage = numChunks > 1;
        MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
        if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()
                && acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
                        topic, subscription, consumerName, msgId);
            }

            increaseAvailablePermits(cnx, numMessages);
            return;
        }

        ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, redeliveryCount, msgMetadata, headersAndPayload,
                cnx);

        boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);

        if (decryptedPayload == null) {
            // Message was discarded or CryptoKeyReader isn't implemented
            return;
        }

        // uncompress decryptedPayload and release decryptedPayload-ByteBuf
        ByteBuf uncompressedPayload = (isMessageUndecryptable || isChunkedMessage) ? decryptedPayload.retain()
                : uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx, true);
        decryptedPayload.release();
        if (uncompressedPayload == null) {
            // Message was discarded on decompression error
            return;
        }

        if (conf.getPayloadProcessor() != null) {
            // uncompressedPayload is released in this method so we don't need to call release() again
            processPayloadByProcessor(brokerEntryMetadata, msgMetadata,
                    uncompressedPayload, msgId, schema, redeliveryCount, ackSet, consumerEpoch);
            return;
        }

        // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
        // and return undecrypted payload
        if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {

            // right now, chunked messages are only supported by non-shared subscription
            if (isChunkedMessage) {
                uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx);
                if (uncompressedPayload == null) {
                    return;
                }

                // last chunk received: so, stitch chunked-messages and clear up chunkedMsgBuffer
                if (log.isDebugEnabled()) {
                    log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}",
                            msgMetadata.getChunkId(), msgMetadata.getNumChunksFromMsg(), msgId,
                            msgMetadata.getSequenceId());
                }

                // remove buffer from the map, set the chunk message id
                ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.remove(msgMetadata.getUuid());
                if (chunkedMsgCtx.chunkedMessageIds.length > 0) {
                    msgId = new ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0],
                            chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]);
                }
                // add chunked messageId to unack-message tracker, and reduce pending-chunked-message count
                unAckedChunkedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
                pendingChunkedMessageCount--;
                chunkedMsgCtx.recycle();
            }

            // If the topic is non-persistent, we should not ignore any messages.
            if (this.topicName.isPersistent() && isSameEntry(msgId) && isPriorEntryIndex(messageId.getEntryId())) {
                // We need to discard entries that were prior to startMessageId
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
                            consumerName, startMessageId);
                }

                uncompressedPayload.release();
                return;
            }

            final MessageImpl<T> message =
                    newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload,
                            schema, redeliveryCount, consumerEpoch);
            uncompressedPayload.release();

            if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
                if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                    possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
                            Collections.singletonList(message));
                    if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
                        redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId()));
                        // The message is skipped due to reaching the max redelivery count,
                        // so we need to increase the available permits
                        increaseAvailablePermits(cnx);
                        return;
                    }
                }
            }
            executeNotifyCallback(message);
        } else {
            // handle batch message enqueuing; uncompressed payload has all messages in batch
            receiveIndividualMessagesFromBatch(brokerEntryMetadata, msgMetadata, redeliveryCount, ackSet,
                    uncompressedPayload, messageId, cnx, consumerEpoch);

            uncompressedPayload.release();
        }
        tryTriggerListener();

    }