in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java [300:421]
public synchronized void readMoreEntries() {
if (cursor.isClosed()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor is already closed, skipping read more entries.", cursor.getName());
}
return;
}
if (isSendInProgress()) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to sending in-progress.",
topic.getName(), getSubscriptionName());
}
return;
}
if (shouldPauseDeliveryForDelayTracker()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to pause delivery for delay tracker.",
topic.getName(), getSubscriptionName());
}
return;
}
if (topic.isTransferring()) {
// Do not deliver messages for topics that are undergoing transfer, as the acknowledgments would be ignored.
return;
}
// totalAvailablePermits may be updated by other threads
int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) {
Pair<Integer, Long> calculateResult = calculateToRead(currentTotalAvailablePermits);
int messagesToRead = calculateResult.getLeft();
long bytesToRead = calculateResult.getRight();
if (messagesToRead == -1 || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
return;
}
NavigableSet<Position> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
NavigableSet<Position> messagesToReplayFiltered = filterOutEntriesWillBeDiscarded(messagesToReplayNow);
if (!messagesToReplayFiltered.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name,
messagesToReplayFiltered.size(), consumerList.size());
}
havePendingReplayRead = true;
minReplayedPosition = messagesToReplayNow.first();
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayFiltered)
: asyncReplayEntries(messagesToReplayFiltered);
// clear already acked positions from replay bucket
deletedMessages.forEach(position -> redeliveryMessages.remove(((Position) position).getLedgerId(),
((Position) position).getEntryId()));
// if all the entries are acked-entries and cleared up from redeliveryMessages, try to read
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntriesAsync();
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
if (log.isDebugEnabled()) {
log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
}
} else if (!havePendingRead && hasConsumersNeededNormalRead()) {
if (shouldPauseOnAckStatePersist(ReadType.Normal)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to blocked on ack state persistent.",
topic.getName(), getSubscriptionName());
}
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
consumerList.size());
}
havePendingRead = true;
NavigableSet<Position> toReplay = getMessagesToReplayNow(1);
if (!toReplay.isEmpty()) {
minReplayedPosition = toReplay.first();
redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId());
} else {
minReplayedPosition = null;
}
// Filter out and skip read delayed messages exist in DelayedDeliveryTracker
if (delayedDeliveryTracker.isPresent()) {
Predicate<Position> skipCondition = null;
final DelayedDeliveryTracker deliveryTracker = delayedDeliveryTracker.get();
if (deliveryTracker instanceof BucketDelayedDeliveryTracker) {
skipCondition = position -> ((BucketDelayedDeliveryTracker) deliveryTracker)
.containsMessage(position.getLedgerId(), position.getEntryId());
}
cursor.asyncReadEntriesWithSkipOrWait(messagesToRead, bytesToRead, this, ReadType.Normal,
topic.getMaxReadPosition(), skipCondition);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, ReadType.Normal,
topic.getMaxReadPosition());
}
} else {
if (log.isDebugEnabled()) {
if (!messagesToReplayNow.isEmpty()) {
log.debug("[{}] [{}] Skipping read for the topic: because all entries in replay queue were"
+ " filtered out due to the mechanism of Key_Shared mode, and the left consumers have"
+ " no permits now",
topic.getName(), getSubscriptionName());
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
}
}
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer buffer is full, pause reading", name);
}
}
}