in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java [125:325]
public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, int startOffset,
List<? extends Entry> entries, EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor,
boolean isReplayRead, Consumer consumer) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
int filteredMessageCount = 0;
int filteredEntryCount = 0;
long filteredBytesCount = 0;
List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
List<Position> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
final Entry entry = entries.get(i);
if (entry == null) {
continue;
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
final int metadataIndex = i + startOffset;
MessageMetadata msgMetadata;
if (metadataArray != null) {
msgMetadata = metadataArray[metadataIndex];
} else if (entry instanceof EntryAndMetadata) {
msgMetadata = ((EntryAndMetadata) entry).getMetadata();
} else {
msgMetadata = Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1);
}
int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
if (hasFilter) {
this.filterProcessedMsgs.add(entryMsgCnt);
}
EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRejectedMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add(entry.getPosition());
entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRescheduledMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
}
if (msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
if (cursor == null || !cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
// because consumer can receive message is smaller than maxReadPosition,
// so this marker is useless for this subscription
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
}
} else if (((PersistentTopic) subscription.getTopic())
.isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()),
entry.getPosition())) {
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
}
}
if (msgMetadata == null || (Markers.isServerOnlyMarker(msgMetadata))) {
Position pos = entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker
if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
final int readerIndex = metadataAndPayload.readerIndex();
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
metadataAndPayload.readerIndex(readerIndex);
}
// Deliver marker to __compaction cursor to avoid compaction task stuck,
// and filter out them when doing topic compaction.
if (msgMetadata == null || cursor == null
|| !cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
entries.set(i, null);
entry.release();
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
Collections.emptyMap());
continue;
}
} else if (trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
// The message is marked for delayed delivery. Ignore for now.
entries.set(i, null);
entry.release();
continue;
}
if (hasFilter) {
this.filterAcceptedMsgs.add(entryMsgCnt);
}
int batchSize = msgMetadata.getNumMessagesInBatch();
long[] ackSet = null;
if (indexesAcks != null && cursor != null) {
Position position = PositionFactory.create(entry.getLedgerId(), entry.getEntryId());
ackSet = cursor
.getDeletedBatchIndexesAsLongArray(position);
// some batch messages ack bit sit will be in pendingAck state, so don't send all bit sit to consumer
if (subscription instanceof PersistentSubscription
&& ((PersistentSubscription) subscription)
.getPendingAckHandle() instanceof PendingAckHandleImpl) {
Position positionInPendingAck =
((PersistentSubscription) subscription).getPositionInPendingAck(position);
// if this position not in pendingAck state, don't need to do any op
if (positionInPendingAck != null) {
long[] pendingAckSet = AckSetStateUtil.getAckSetArrayOrNull(positionInPendingAck);
if (pendingAckSet != null) {
// need to or ackSet in pendingAck state and cursor ackSet which bit sit has been acked
if (ackSet != null) {
ackSet = andAckSet(ackSet, pendingAckSet);
} else {
// if actSet is null, use pendingAck ackSet
ackSet = pendingAckSet;
}
// if the result of pendingAckSet(in pendingAckHandle) AND the ackSet(in cursor) is empty
// filter this entry
if (isAckSetEmpty(ackSet)) {
entries.set(i, null);
entry.release();
continue;
}
} else {
// filter non-batch message in pendingAck state
entries.set(i, null);
entry.release();
continue;
}
}
}
if (ackSet != null) {
indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
} else {
indexesAcks.setIndexesAcks(i, null);
}
}
totalEntries++;
totalMessages += batchSize;
totalBytes += metadataAndPayload.readableBytes();
totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
batchSizes.setBatchSize(i, batchSize);
BrokerInterceptor interceptor = subscription.interceptor();
if (null != interceptor) {
// keep for compatibility if users has implemented the old interface
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
}
}
if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
individualAcknowledgeMessageIfNeeded(entriesToFiltered, Collections.emptyMap());
int filtered = entriesToFiltered.size();
Topic topic = subscription.getTopic();
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).addFilteredEntriesCount(filtered);
}
}
if (CollectionUtils.isNotEmpty(entriesToRedeliver)) {
this.subscription.getTopic().getBrokerService().getPulsar().getExecutor()
.schedule(() -> {
// simulate the Consumer rejected the message
subscription
.redeliverUnacknowledgedMessages(consumer, entriesToRedeliver);
}, serviceConfig.getDispatcherEntryFilterRescheduledMessageDelay(), TimeUnit.MILLISECONDS);
}
if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
acquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, filteredEntryCount,
filteredMessageCount, filteredBytesCount);
}
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
return totalEntries;
}