private void fetchIfNeeded()

in src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java [136:176]


    private void fetchIfNeeded(long requestedMinOffset) throws InterruptedException {
        long cachedMinOffset = getMinOffset();
        if (requestedMinOffset < cachedMinOffset) {

            LOG.debug("Requested min offset {} smaller than cached min offset {}", requestedMinOffset, cachedMinOffset);

            // Fetching data from a topic is a costly
            // operation. In most cases, we expect the queues
            // to be roughly at the same state, and thus attempt
            // to fetch roughly the same data concurrently.
            // In order to minimize the cost, we limit to
            // running a single head poller at the same time.
            //
            // This implies that concurrent requests that require
            // a head poller will block until the head poller is
            // available. The headPollerLock guarantees to not
            // run head pollers concurrently.
            boolean locked = headPollerLock.tryLock(MAX_FETCH_WAIT_MS, MILLISECONDS);
            if (! locked) {
                String msg = String.format(
                        "Gave up fetching the queue state after %d ms because another thread holds the lock "
                                + "(requested offset = %d, cached min offset = %d)",
                        MAX_FETCH_WAIT_MS, requestedMinOffset, cachedMinOffset);
                throw new RuntimeException(msg);
            }
            try {

                // Once the headPollerLock has been acquired,
                // we check whether the data must still be
                // fetched. The data may have been fetched
                // while waiting on the lock.
                cachedMinOffset = getMinOffset();
                if (requestedMinOffset < cachedMinOffset) {

                    fetch(requestedMinOffset, cachedMinOffset);
                }
            } finally {
                headPollerLock.unlock();
            }
        }
    }