private boolean rereplicate()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java [443:543]


    private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedException, BKException,
            UnavailableException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Going to replicate the fragments of the ledger: {}", ledgerIdToReplicate);
        }

        boolean deferLedgerLockRelease = false;

        try (LedgerHandle lh = admin.openLedgerNoRecovery(ledgerIdToReplicate)) {
            Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh,
                    conf.getAuditorLedgerVerificationPercentage());

            if (LOG.isDebugEnabled()) {
                LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
            }

            boolean foundOpenFragments = false;
            long numFragsReplicated = 0;
            long numNotAdheringPlacementFragsReplicated = 0;
            for (LedgerFragment ledgerFragment : fragments) {
                if (!ledgerFragment.isClosed()) {
                    foundOpenFragments = true;
                    continue;
                }
                if (!tryReadingFaultyEntries(lh, ledgerFragment)) {
                    LOG.error("Failed to read faulty entries, so giving up replicating ledgerFragment {}",
                            ledgerFragment);
                    continue;
                }
                try {
                    admin.replicateLedgerFragment(lh, ledgerFragment, onReadEntryFailureCallback);
                    numFragsReplicated++;
                    if (ledgerFragment.getReplicateType() == LedgerFragment
                            .ReplicateType.DATA_NOT_ADHERING_PLACEMENT) {
                        numNotAdheringPlacementFragsReplicated++;
                    }
                } catch (BKException.BKBookieHandleNotAvailableException e) {
                    LOG.warn("BKBookieHandleNotAvailableException while replicating the fragment", e);
                } catch (BKException.BKLedgerRecoveryException e) {
                    LOG.warn("BKLedgerRecoveryException while replicating the fragment", e);
                } catch (BKException.BKNotEnoughBookiesException e) {
                    LOG.warn("BKNotEnoughBookiesException while replicating the fragment", e);
                }
            }

            if (numFragsReplicated > 0) {
                numLedgersReplicated.inc();
            }
            if (numNotAdheringPlacementFragsReplicated > 0) {
                numNotAdheringPlacementLedgersReplicated.inc();
            }

            if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
                deferLedgerLockRelease = true;
                deferLedgerLockRelease(ledgerIdToReplicate);
                return false;
            }

            fragments = getUnderreplicatedFragments(lh, conf.getAuditorLedgerVerificationPercentage());
            if (fragments.size() == 0) {
                LOG.info("Ledger replicated successfully. ledger id is: " + ledgerIdToReplicate);
                underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
                return true;
            } else {
                deferLedgerLockRelease = true;
                deferLedgerLockReleaseOfFailedLedger(ledgerIdToReplicate);
                numDeferLedgerLockReleaseOfFailedLedger.inc();
                // Releasing the underReplication ledger lock and compete
                // for the replication again for the pending fragments
                return false;
            }

        } catch (BKNoSuchLedgerExistsOnMetadataServerException e) {
            // Ledger might have been deleted by user
            LOG.info("BKNoSuchLedgerExistsOnMetadataServerException while opening "
                + "ledger {} for replication. Other clients "
                + "might have deleted the ledger. "
                + "So, no harm to continue", ledgerIdToReplicate);
            underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
            getExceptionCounter("BKNoSuchLedgerExistsOnMetadataServerException").inc();
            return false;
        } catch (BKNotEnoughBookiesException e) {
            logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate);
            throw e;
        } catch (BKException e) {
            logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate);
            return false;
        } finally {
            // we make sure we always release the underreplicated lock, unless we decided to defer it. If the lock has
            // already been released, this is a no-op
            if (!deferLedgerLockRelease) {
                try {
                    underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
                } catch (UnavailableException e) {
                    LOG.error("UnavailableException while releasing the underreplicated lock for ledger {}:",
                        ledgerIdToReplicate, e);
                    shutdown();
                }
            }
        }
    }