public int filterEntriesForConsumer()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java [125:325]


    public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, int startOffset,
                                        List<? extends Entry> entries, EntryBatchSizes batchSizes,
                                        SendMessageInfo sendMessageInfo,
                                        EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor,
                                        boolean isReplayRead, Consumer consumer) {
        int totalMessages = 0;
        long totalBytes = 0;
        int totalChunkedMessages = 0;
        int totalEntries = 0;
        int filteredMessageCount = 0;
        int filteredEntryCount = 0;
        long filteredBytesCount = 0;
        List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
        List<Position> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
        for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
            final Entry entry = entries.get(i);
            if (entry == null) {
                continue;
            }
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            final int metadataIndex = i + startOffset;

            MessageMetadata msgMetadata;
            if (metadataArray != null) {
                msgMetadata = metadataArray[metadataIndex];
            } else if (entry instanceof EntryAndMetadata) {
                msgMetadata = ((EntryAndMetadata) entry).getMetadata();
            } else {
                msgMetadata = Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1);
            }

            int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
            if (hasFilter) {
                this.filterProcessedMsgs.add(entryMsgCnt);
            }

            EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
            if (filterResult == EntryFilter.FilterResult.REJECT) {
                entriesToFiltered.add(entry.getPosition());
                entries.set(i, null);
                // FilterResult will be always `ACCEPTED` when there is No Filter
                // dont need to judge whether `hasFilter` is true or not.
                this.filterRejectedMsgs.add(entryMsgCnt);
                filteredEntryCount++;
                filteredMessageCount += entryMsgCnt;
                filteredBytesCount += metadataAndPayload.readableBytes();
                entry.release();
                continue;
            } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
                entriesToRedeliver.add(entry.getPosition());
                entries.set(i, null);
                // FilterResult will be always `ACCEPTED` when there is No Filter
                // dont need to judge whether `hasFilter` is true or not.
                this.filterRescheduledMsgs.add(entryMsgCnt);
                filteredEntryCount++;
                filteredMessageCount += entryMsgCnt;
                filteredBytesCount += metadataAndPayload.readableBytes();
                entry.release();
                continue;
            }
            if (msgMetadata != null && msgMetadata.hasTxnidMostBits()
                    && msgMetadata.hasTxnidLeastBits()) {
                if (Markers.isTxnMarker(msgMetadata)) {
                    if (cursor == null || !cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
                        // because consumer can receive message is smaller than maxReadPosition,
                        // so this marker is useless for this subscription
                        individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
                                Collections.emptyMap());
                        entries.set(i, null);
                        entry.release();
                        continue;
                    }
                } else if (((PersistentTopic) subscription.getTopic())
                        .isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()),
                                entry.getPosition())) {
                    individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
                            Collections.emptyMap());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
            }

            if (msgMetadata == null || (Markers.isServerOnlyMarker(msgMetadata))) {
                Position pos = entry.getPosition();
                // Message metadata was corrupted or the messages was a server-only marker

                if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
                    final int readerIndex = metadataAndPayload.readerIndex();
                    processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
                    metadataAndPayload.readerIndex(readerIndex);
                }

                // Deliver marker to __compaction cursor to avoid compaction task stuck,
                // and filter out them when doing topic compaction.
                if (msgMetadata == null || cursor == null
                        || !cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
                    entries.set(i, null);
                    entry.release();
                    individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
                            Collections.emptyMap());
                    continue;
                }
            } else if (trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
                // The message is marked for delayed delivery. Ignore for now.
                entries.set(i, null);
                entry.release();
                continue;
            }

            if (hasFilter) {
                this.filterAcceptedMsgs.add(entryMsgCnt);
            }

            int batchSize = msgMetadata.getNumMessagesInBatch();
            long[] ackSet = null;
            if (indexesAcks != null && cursor != null) {
                Position position = PositionFactory.create(entry.getLedgerId(), entry.getEntryId());
                ackSet = cursor
                        .getDeletedBatchIndexesAsLongArray(position);
                // some batch messages ack bit sit will be in pendingAck state, so don't send all bit sit to consumer
                if (subscription instanceof PersistentSubscription
                        && ((PersistentSubscription) subscription)
                        .getPendingAckHandle() instanceof PendingAckHandleImpl) {
                    Position positionInPendingAck =
                            ((PersistentSubscription) subscription).getPositionInPendingAck(position);
                    // if this position not in pendingAck state, don't need to do any op
                    if (positionInPendingAck != null) {
                        long[] pendingAckSet = AckSetStateUtil.getAckSetArrayOrNull(positionInPendingAck);
                        if (pendingAckSet != null) {
                            // need to or ackSet in pendingAck state and cursor ackSet which bit sit has been acked
                            if (ackSet != null) {
                                ackSet = andAckSet(ackSet, pendingAckSet);
                            } else {
                                // if actSet is null, use pendingAck ackSet
                                ackSet = pendingAckSet;
                            }
                            // if the result of pendingAckSet(in pendingAckHandle) AND the ackSet(in cursor) is empty
                            // filter this entry
                            if (isAckSetEmpty(ackSet)) {
                                entries.set(i, null);
                                entry.release();
                                continue;
                            }
                        } else {
                            // filter non-batch message in pendingAck state
                            entries.set(i, null);
                            entry.release();
                            continue;
                        }
                    }
                }
                if (ackSet != null) {
                    indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
                } else {
                    indexesAcks.setIndexesAcks(i, null);
                }
            }

            totalEntries++;
            totalMessages += batchSize;
            totalBytes += metadataAndPayload.readableBytes();
            totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
            batchSizes.setBatchSize(i, batchSize);

            BrokerInterceptor interceptor = subscription.interceptor();
            if (null != interceptor) {
                // keep for compatibility if users has implemented the old interface
                interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
                interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
            }
        }
        if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
            individualAcknowledgeMessageIfNeeded(entriesToFiltered, Collections.emptyMap());

            int filtered = entriesToFiltered.size();
            Topic topic = subscription.getTopic();
            if (topic instanceof AbstractTopic) {
                ((AbstractTopic) topic).addFilteredEntriesCount(filtered);
            }
        }
        if (CollectionUtils.isNotEmpty(entriesToRedeliver)) {
            this.subscription.getTopic().getBrokerService().getPulsar().getExecutor()
                    .schedule(() -> {
                        // simulate the Consumer rejected the message
                        subscription
                                .redeliverUnacknowledgedMessages(consumer, entriesToRedeliver);
                    }, serviceConfig.getDispatcherEntryFilterRescheduledMessageDelay(), TimeUnit.MILLISECONDS);

        }

        if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
            acquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, filteredEntryCount,
                    filteredMessageCount, filteredBytesCount);
        }

        sendMessageInfo.setTotalMessages(totalMessages);
        sendMessageInfo.setTotalBytes(totalBytes);
        sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
        return totalEntries;
    }