in bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java [443:543]
private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedException, BKException,
UnavailableException {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to replicate the fragments of the ledger: {}", ledgerIdToReplicate);
}
boolean deferLedgerLockRelease = false;
try (LedgerHandle lh = admin.openLedgerNoRecovery(ledgerIdToReplicate)) {
Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh,
conf.getAuditorLedgerVerificationPercentage());
if (LOG.isDebugEnabled()) {
LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
}
boolean foundOpenFragments = false;
long numFragsReplicated = 0;
long numNotAdheringPlacementFragsReplicated = 0;
for (LedgerFragment ledgerFragment : fragments) {
if (!ledgerFragment.isClosed()) {
foundOpenFragments = true;
continue;
}
if (!tryReadingFaultyEntries(lh, ledgerFragment)) {
LOG.error("Failed to read faulty entries, so giving up replicating ledgerFragment {}",
ledgerFragment);
continue;
}
try {
admin.replicateLedgerFragment(lh, ledgerFragment, onReadEntryFailureCallback);
numFragsReplicated++;
if (ledgerFragment.getReplicateType() == LedgerFragment
.ReplicateType.DATA_NOT_ADHERING_PLACEMENT) {
numNotAdheringPlacementFragsReplicated++;
}
} catch (BKException.BKBookieHandleNotAvailableException e) {
LOG.warn("BKBookieHandleNotAvailableException while replicating the fragment", e);
} catch (BKException.BKLedgerRecoveryException e) {
LOG.warn("BKLedgerRecoveryException while replicating the fragment", e);
} catch (BKException.BKNotEnoughBookiesException e) {
LOG.warn("BKNotEnoughBookiesException while replicating the fragment", e);
}
}
if (numFragsReplicated > 0) {
numLedgersReplicated.inc();
}
if (numNotAdheringPlacementFragsReplicated > 0) {
numNotAdheringPlacementLedgersReplicated.inc();
}
if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
deferLedgerLockRelease = true;
deferLedgerLockRelease(ledgerIdToReplicate);
return false;
}
fragments = getUnderreplicatedFragments(lh, conf.getAuditorLedgerVerificationPercentage());
if (fragments.size() == 0) {
LOG.info("Ledger replicated successfully. ledger id is: " + ledgerIdToReplicate);
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
return true;
} else {
deferLedgerLockRelease = true;
deferLedgerLockReleaseOfFailedLedger(ledgerIdToReplicate);
numDeferLedgerLockReleaseOfFailedLedger.inc();
// Releasing the underReplication ledger lock and compete
// for the replication again for the pending fragments
return false;
}
} catch (BKNoSuchLedgerExistsOnMetadataServerException e) {
// Ledger might have been deleted by user
LOG.info("BKNoSuchLedgerExistsOnMetadataServerException while opening "
+ "ledger {} for replication. Other clients "
+ "might have deleted the ledger. "
+ "So, no harm to continue", ledgerIdToReplicate);
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
getExceptionCounter("BKNoSuchLedgerExistsOnMetadataServerException").inc();
return false;
} catch (BKNotEnoughBookiesException e) {
logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate);
throw e;
} catch (BKException e) {
logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate);
return false;
} finally {
// we make sure we always release the underreplicated lock, unless we decided to defer it. If the lock has
// already been released, this is a no-op
if (!deferLedgerLockRelease) {
try {
underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
} catch (UnavailableException e) {
LOG.error("UnavailableException while releasing the underreplicated lock for ledger {}:",
ledgerIdToReplicate, e);
shutdown();
}
}
}
}