in src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCache.java [136:176]
private void fetchIfNeeded(long requestedMinOffset) throws InterruptedException {
long cachedMinOffset = getMinOffset();
if (requestedMinOffset < cachedMinOffset) {
LOG.debug("Requested min offset {} smaller than cached min offset {}", requestedMinOffset, cachedMinOffset);
// Fetching data from a topic is a costly
// operation. In most cases, we expect the queues
// to be roughly at the same state, and thus attempt
// to fetch roughly the same data concurrently.
// In order to minimize the cost, we limit to
// running a single head poller at the same time.
//
// This implies that concurrent requests that require
// a head poller will block until the head poller is
// available. The headPollerLock guarantees to not
// run head pollers concurrently.
boolean locked = headPollerLock.tryLock(MAX_FETCH_WAIT_MS, MILLISECONDS);
if (! locked) {
String msg = String.format(
"Gave up fetching the queue state after %d ms because another thread holds the lock "
+ "(requested offset = %d, cached min offset = %d)",
MAX_FETCH_WAIT_MS, requestedMinOffset, cachedMinOffset);
throw new RuntimeException(msg);
}
try {
// Once the headPollerLock has been acquired,
// we check whether the data must still be
// fetched. The data may have been fetched
// while waiting on the lock.
cachedMinOffset = getMinOffset();
if (requestedMinOffset < cachedMinOffset) {
fetch(requestedMinOffset, cachedMinOffset);
}
} finally {
headPollerLock.unlock();
}
}
}