in bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java [166:276]
void replicasCheck() throws ReplicationException.BKAuditException {
ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries =
new ConcurrentHashMap<Long, MissingEntriesInfoOfLedger>();
ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies =
new ConcurrentHashMap<Long, MissingEntriesInfoOfLedger>();
LedgerManager.LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges(zkOpTimeoutMs);
final Semaphore maxConcurrentSemaphore = new Semaphore(MAX_CONCURRENT_REPLICAS_CHECK_LEDGER_REQUESTS);
while (true) {
LedgerManager.LedgerRange ledgerRange = null;
try {
if (ledgerRangeIterator.hasNext()) {
ledgerRange = ledgerRangeIterator.next();
} else {
break;
}
} catch (IOException ioe) {
LOG.error("Got IOException while iterating LedgerRangeIterator", ioe);
throw new ReplicationException.BKAuditException(
"Got IOException while iterating LedgerRangeIterator", ioe);
}
ledgersWithMissingEntries.clear();
ledgersWithUnavailableBookies.clear();
numLedgersFoundHavingNoReplicaOfAnEntry.set(0);
numLedgersFoundHavingLessThanAQReplicasOfAnEntry.set(0);
numLedgersFoundHavingLessThanWQReplicasOfAnEntry.set(0);
Set<Long> ledgersInRange = ledgerRange.getLedgers();
int numOfLedgersInRange = ledgersInRange.size();
// Final result after processing all the ledgers
final AtomicInteger resultCode = new AtomicInteger();
final CountDownLatch replicasCheckLatch = new CountDownLatch(1);
ReplicasCheckFinalCallback finalCB = new ReplicasCheckFinalCallback(resultCode, replicasCheckLatch);
MultiCallback mcbForThisLedgerRange = new MultiCallback(numOfLedgersInRange, finalCB, null,
BKException.Code.OK, BKException.Code.ReadException) {
@Override
public void processResult(int rc, String path, Object ctx) {
try {
super.processResult(rc, path, ctx);
} finally {
maxConcurrentSemaphore.release();
}
}
};
if (LOG.isDebugEnabled()) {
LOG.debug("Number of ledgers in the current LedgerRange : {}",
numOfLedgersInRange);
}
for (Long ledgerInRange : ledgersInRange) {
try {
if (!maxConcurrentSemaphore.tryAcquire(REPLICAS_CHECK_TIMEOUT_IN_SECS, TimeUnit.SECONDS)) {
LOG.error("Timedout ({} secs) while waiting for acquiring semaphore",
REPLICAS_CHECK_TIMEOUT_IN_SECS);
throw new ReplicationException.BKAuditException(
"Timedout while waiting for acquiring semaphore");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Got InterruptedException while acquiring semaphore for replicascheck", ie);
throw new ReplicationException.BKAuditException(
"Got InterruptedException while acquiring semaphore for replicascheck", ie);
}
if (checkUnderReplicationForReplicasCheck(ledgerInRange, mcbForThisLedgerRange)) {
/*
* if ledger is marked underreplicated, then ignore this
* ledger for replicascheck.
*/
continue;
}
ledgerManager.readLedgerMetadata(ledgerInRange)
.whenComplete(new ReadLedgerMetadataCallbackForReplicasCheck(ledgerInRange,
mcbForThisLedgerRange, ledgersWithMissingEntries, ledgersWithUnavailableBookies));
}
try {
/*
* if mcbForThisLedgerRange is not calledback within
* REPLICAS_CHECK_TIMEOUT_IN_SECS secs then better give up
* doing replicascheck, since there could be an issue and
* blocking the single threaded auditor executor thread is not
* expected.
*/
if (!replicasCheckLatch.await(REPLICAS_CHECK_TIMEOUT_IN_SECS, TimeUnit.SECONDS)) {
LOG.error(
"For LedgerRange with num of ledgers : {} it didn't complete replicascheck"
+ " in {} secs, so giving up",
numOfLedgersInRange, REPLICAS_CHECK_TIMEOUT_IN_SECS);
throw new ReplicationException.BKAuditException(
"Got InterruptedException while doing replicascheck");
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Got InterruptedException while doing replicascheck", ie);
throw new ReplicationException.BKAuditException(
"Got InterruptedException while doing replicascheck", ie);
}
reportLedgersWithMissingEntries(ledgersWithMissingEntries);
reportLedgersWithUnavailableBookies(ledgersWithUnavailableBookies);
int resultCodeIntValue = resultCode.get();
if (resultCodeIntValue != BKException.Code.OK) {
throw new ReplicationException.BKAuditException("Exception while doing replicas check",
BKException.create(resultCodeIntValue));
}
}
try {
ledgerUnderreplicationManager.setReplicasCheckCTime(System.currentTimeMillis());
} catch (ReplicationException.NonRecoverableReplicationException nre) {
LOG.error("Non Recoverable Exception while reading from ZK", nre);
submitShutdownTask();
} catch (ReplicationException.UnavailableException ue) {
LOG.error("Got exception while trying to set ReplicasCheckCTime", ue);
}
}