void replicasCheck()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java [166:276]


    void replicasCheck() throws ReplicationException.BKAuditException {
        ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries =
                new ConcurrentHashMap<Long, MissingEntriesInfoOfLedger>();
        ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies =
                new ConcurrentHashMap<Long, MissingEntriesInfoOfLedger>();
        LedgerManager.LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges(zkOpTimeoutMs);
        final Semaphore maxConcurrentSemaphore = new Semaphore(MAX_CONCURRENT_REPLICAS_CHECK_LEDGER_REQUESTS);
        while (true) {
            LedgerManager.LedgerRange ledgerRange = null;
            try {
                if (ledgerRangeIterator.hasNext()) {
                    ledgerRange = ledgerRangeIterator.next();
                } else {
                    break;
                }
            } catch (IOException ioe) {
                LOG.error("Got IOException while iterating LedgerRangeIterator", ioe);
                throw new ReplicationException.BKAuditException(
                        "Got IOException while iterating LedgerRangeIterator", ioe);
            }
            ledgersWithMissingEntries.clear();
            ledgersWithUnavailableBookies.clear();
            numLedgersFoundHavingNoReplicaOfAnEntry.set(0);
            numLedgersFoundHavingLessThanAQReplicasOfAnEntry.set(0);
            numLedgersFoundHavingLessThanWQReplicasOfAnEntry.set(0);
            Set<Long> ledgersInRange = ledgerRange.getLedgers();
            int numOfLedgersInRange = ledgersInRange.size();
            // Final result after processing all the ledgers
            final AtomicInteger resultCode = new AtomicInteger();
            final CountDownLatch replicasCheckLatch = new CountDownLatch(1);

            ReplicasCheckFinalCallback finalCB = new ReplicasCheckFinalCallback(resultCode, replicasCheckLatch);
            MultiCallback mcbForThisLedgerRange = new MultiCallback(numOfLedgersInRange, finalCB, null,
                    BKException.Code.OK, BKException.Code.ReadException) {
                @Override
                public void processResult(int rc, String path, Object ctx) {
                    try {
                        super.processResult(rc, path, ctx);
                    } finally {
                        maxConcurrentSemaphore.release();
                    }
                }
            };
            if (LOG.isDebugEnabled()) {
                LOG.debug("Number of ledgers in the current LedgerRange : {}",
                        numOfLedgersInRange);
            }
            for (Long ledgerInRange : ledgersInRange) {
                try {
                    if (!maxConcurrentSemaphore.tryAcquire(REPLICAS_CHECK_TIMEOUT_IN_SECS, TimeUnit.SECONDS)) {
                        LOG.error("Timedout ({} secs) while waiting for acquiring semaphore",
                                REPLICAS_CHECK_TIMEOUT_IN_SECS);
                        throw new ReplicationException.BKAuditException(
                                "Timedout while waiting for acquiring semaphore");
                    }
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    LOG.error("Got InterruptedException while acquiring semaphore for replicascheck", ie);
                    throw new ReplicationException.BKAuditException(
                            "Got InterruptedException while acquiring semaphore for replicascheck", ie);
                }
                if (checkUnderReplicationForReplicasCheck(ledgerInRange, mcbForThisLedgerRange)) {
                    /*
                     * if ledger is marked underreplicated, then ignore this
                     * ledger for replicascheck.
                     */
                    continue;
                }
                ledgerManager.readLedgerMetadata(ledgerInRange)
                        .whenComplete(new ReadLedgerMetadataCallbackForReplicasCheck(ledgerInRange,
                                mcbForThisLedgerRange, ledgersWithMissingEntries, ledgersWithUnavailableBookies));
            }
            try {
                /*
                 * if mcbForThisLedgerRange is not calledback within
                 * REPLICAS_CHECK_TIMEOUT_IN_SECS secs then better give up
                 * doing replicascheck, since there could be an issue and
                 * blocking the single threaded auditor executor thread is not
                 * expected.
                 */
                if (!replicasCheckLatch.await(REPLICAS_CHECK_TIMEOUT_IN_SECS, TimeUnit.SECONDS)) {
                    LOG.error(
                            "For LedgerRange with num of ledgers : {} it didn't complete replicascheck"
                                    + " in {} secs, so giving up",
                            numOfLedgersInRange, REPLICAS_CHECK_TIMEOUT_IN_SECS);
                    throw new ReplicationException.BKAuditException(
                            "Got InterruptedException while doing replicascheck");
                }
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                LOG.error("Got InterruptedException while doing replicascheck", ie);
                throw new ReplicationException.BKAuditException(
                        "Got InterruptedException while doing replicascheck", ie);
            }
            reportLedgersWithMissingEntries(ledgersWithMissingEntries);
            reportLedgersWithUnavailableBookies(ledgersWithUnavailableBookies);
            int resultCodeIntValue = resultCode.get();
            if (resultCodeIntValue != BKException.Code.OK) {
                throw new ReplicationException.BKAuditException("Exception while doing replicas check",
                        BKException.create(resultCodeIntValue));
            }
        }
        try {
            ledgerUnderreplicationManager.setReplicasCheckCTime(System.currentTimeMillis());
        } catch (ReplicationException.NonRecoverableReplicationException nre) {
            LOG.error("Non Recoverable Exception while reading from ZK", nre);
            submitShutdownTask();
        } catch (ReplicationException.UnavailableException ue) {
            LOG.error("Got exception while trying to set ReplicasCheckCTime", ue);
        }
    }