public synchronized void readMoreEntries()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java [300:421]


    public synchronized void readMoreEntries() {
        if (cursor.isClosed()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cursor is already closed, skipping read more entries.", cursor.getName());
            }
            return;
        }
        if (isSendInProgress()) {
            // we cannot read more entries while sending the previous batch
            // otherwise we could re-read the same entries and send duplicates
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Skipping read for the topic, Due to sending in-progress.",
                        topic.getName(), getSubscriptionName());
            }
            return;
        }
        if (shouldPauseDeliveryForDelayTracker()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Skipping read for the topic, Due to pause delivery for delay tracker.",
                        topic.getName(), getSubscriptionName());
            }
            return;
        }
        if (topic.isTransferring()) {
            // Do not deliver messages for topics that are undergoing transfer, as the acknowledgments would be ignored.
            return;
        }

        // totalAvailablePermits may be updated by other threads
        int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
        int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
        if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) {
            Pair<Integer, Long> calculateResult = calculateToRead(currentTotalAvailablePermits);
            int messagesToRead = calculateResult.getLeft();
            long bytesToRead = calculateResult.getRight();

            if (messagesToRead == -1 || bytesToRead == -1) {
                // Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
                return;
            }

            NavigableSet<Position> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
            NavigableSet<Position> messagesToReplayFiltered = filterOutEntriesWillBeDiscarded(messagesToReplayNow);
            if (!messagesToReplayFiltered.isEmpty()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule replay of {} messages for {} consumers", name,
                            messagesToReplayFiltered.size(), consumerList.size());
                }

                havePendingReplayRead = true;
                minReplayedPosition = messagesToReplayNow.first();
                Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
                        ? asyncReplayEntriesInOrder(messagesToReplayFiltered)
                        : asyncReplayEntries(messagesToReplayFiltered);
                // clear already acked positions from replay bucket

                deletedMessages.forEach(position -> redeliveryMessages.remove(((Position) position).getLedgerId(),
                        ((Position) position).getEntryId()));
                // if all the entries are acked-entries and cleared up from redeliveryMessages, try to read
                // next entries as readCompletedEntries-callback was never called
                if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) {
                    havePendingReplayRead = false;
                    readMoreEntriesAsync();
                }
            } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
                            totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
                }
            } else if (!havePendingRead && hasConsumersNeededNormalRead()) {
                if (shouldPauseOnAckStatePersist(ReadType.Normal)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Skipping read for the topic, Due to blocked on ack state persistent.",
                                topic.getName(), getSubscriptionName());
                    }
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
                            consumerList.size());
                }
                havePendingRead = true;
                NavigableSet<Position> toReplay = getMessagesToReplayNow(1);
                if (!toReplay.isEmpty()) {
                    minReplayedPosition = toReplay.first();
                    redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId());
                } else {
                    minReplayedPosition = null;
                }

                // Filter out and skip read delayed messages exist in DelayedDeliveryTracker
                if (delayedDeliveryTracker.isPresent()) {
                    Predicate<Position> skipCondition = null;
                    final DelayedDeliveryTracker deliveryTracker = delayedDeliveryTracker.get();
                    if (deliveryTracker instanceof BucketDelayedDeliveryTracker) {
                        skipCondition = position -> ((BucketDelayedDeliveryTracker) deliveryTracker)
                                .containsMessage(position.getLedgerId(), position.getEntryId());
                    }
                    cursor.asyncReadEntriesWithSkipOrWait(messagesToRead, bytesToRead, this, ReadType.Normal,
                            topic.getMaxReadPosition(), skipCondition);
                } else {
                    cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, ReadType.Normal,
                            topic.getMaxReadPosition());
                }
            } else {
                if (log.isDebugEnabled()) {
                    if (!messagesToReplayNow.isEmpty()) {
                        log.debug("[{}] [{}] Skipping read for the topic: because all entries in replay queue were"
                                + " filtered out due to the mechanism of Key_Shared mode, and the left consumers have"
                                + " no permits now",
                                topic.getName(), getSubscriptionName());
                    } else {
                        log.debug("[{}] Cannot schedule next read until previous one is done", name);
                    }
                }
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Consumer buffer is full, pause reading", name);
            }
        }
    }