in bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java [807:1011]
private void recoverLedger(final Set<BookieId> bookiesSrc, final long lId, final boolean dryrun,
final boolean skipOpenLedgers, final boolean skipUnrecoverableLedgers,
final AsyncCallback.VoidCallback finalLedgerIterCb) {
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering ledger : {}", lId);
}
asyncOpenLedgerNoRecovery(lId, new OpenCallback() {
@Override
public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
if (rc != BKException.Code.OK) {
if (skipUnrecoverableLedgers) {
LOG.warn("BK error opening ledger: {}, skip recover it.", lId, BKException.create(rc));
finalLedgerIterCb.processResult(BKException.Code.OK, null, null);
} else {
LOG.error("BK error opening ledger: {}", lId, BKException.create(rc));
finalLedgerIterCb.processResult(rc, null, null);
}
return;
}
LedgerMetadata lm = lh.getLedgerMetadata();
if (skipOpenLedgers && lm.getState() == LedgerMetadata.State.OPEN) {
LOG.info("Skip recovering open ledger {}.", lId);
try {
lh.close();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (BKException bke) {
LOG.warn("Error on closing ledger handle for {}.", lId);
}
finalLedgerIterCb.processResult(BKException.Code.OK, null, null);
return;
}
final boolean fenceRequired = !lm.isClosed() && containBookiesInLastEnsemble(lm, bookiesSrc);
// the original write has not removed faulty bookie from
// current ledger ensemble. to avoid data loss issue in
// the case of concurrent updates to the ensemble composition,
// the recovery tool should first close the ledger
if (!dryrun && fenceRequired) {
// close opened non recovery ledger handle
try {
lh.close();
} catch (Exception ie) {
LOG.warn("Error closing non recovery ledger handle for ledger " + lId, ie);
}
asyncOpenLedger(lId, new OpenCallback() {
@Override
public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
if (newrc != BKException.Code.OK) {
if (skipUnrecoverableLedgers) {
LOG.warn("BK error opening ledger: {}, skip recover it.",
lId, BKException.create(newrc));
finalLedgerIterCb.processResult(BKException.Code.OK, null, null);
} else {
LOG.error("BK error close ledger: {}", lId, BKException.create(newrc));
finalLedgerIterCb.processResult(newrc, null, null);
}
return;
}
bkc.mainWorkerPool.submit(() -> {
// do recovery
recoverLedger(bookiesSrc, lId, dryrun, skipOpenLedgers,
skipUnrecoverableLedgers, finalLedgerIterCb);
});
}
}, null);
return;
}
final AsyncCallback.VoidCallback ledgerIterCb = new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
if (BKException.Code.OK != rc) {
if (skipUnrecoverableLedgers) {
LOG.warn("Failed to recover ledger: {} : {}, skip recover it.", lId,
BKException.codeLogger(rc));
rc = BKException.Code.OK;
} else {
LOG.error("Failed to recover ledger {} : {}", lId, BKException.codeLogger(rc));
}
} else {
LOG.info("Recovered ledger {}.", lId);
}
try {
lh.close();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (BKException bke) {
LOG.warn("Error on closing ledger handle for {}.", lId);
}
finalLedgerIterCb.processResult(rc, path, ctx);
}
};
/*
* This List stores the ledger fragments to recover indexed by
* the start entry ID for the range. The ensembles TreeMap is
* keyed off this.
*/
final List<Long> ledgerFragmentsToRecover = new LinkedList<Long>();
/*
* This Map will store the start and end entry ID values for
* each of the ledger fragment ranges. The only exception is the
* current active fragment since it has no end yet. In the event
* of a bookie failure, a new ensemble is created so the current
* ensemble should not contain the dead bookie we are trying to
* recover.
*/
Map<Long, Long> ledgerFragmentsRange = new HashMap<Long, Long>();
Long curEntryId = null;
for (Map.Entry<Long, ? extends List<BookieId>> entry :
lh.getLedgerMetadata().getAllEnsembles().entrySet()) {
if (curEntryId != null) {
ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1);
}
curEntryId = entry.getKey();
if (containBookies(entry.getValue(), bookiesSrc)) {
/*
* Current ledger fragment has entries stored on the
* dead bookie so we'll need to recover them.
*/
ledgerFragmentsToRecover.add(entry.getKey());
}
}
// add last ensemble otherwise if the failed bookie existed in
// the last ensemble of a closed ledger. the entries belonged to
// last ensemble would not be replicated.
if (curEntryId != null) {
ledgerFragmentsRange.put(curEntryId, lh.getLastAddConfirmed());
}
/*
* See if this current ledger contains any ledger fragment that
* needs to be re-replicated. If not, then just invoke the
* multiCallback and return.
*/
if (ledgerFragmentsToRecover.size() == 0) {
ledgerIterCb.processResult(BKException.Code.OK, null, null);
return;
}
if (dryrun) {
VERBOSE.info("Recovered ledger {} : {}", lId, (fenceRequired ? "[fence required]" : ""));
}
/*
* Multicallback for ledger. Once all fragments for the ledger have been recovered
* trigger the ledgerIterCb
*/
MultiCallback ledgerFragmentsMcb = new MultiCallback(ledgerFragmentsToRecover.size(), ledgerIterCb,
null, BKException.Code.OK, BKException.Code.LedgerRecoveryException);
/*
* Now recover all of the necessary ledger fragments
* asynchronously using a MultiCallback for every fragment.
*/
for (final Long startEntryId : ledgerFragmentsToRecover) {
Long endEntryId = ledgerFragmentsRange.get(startEntryId);
List<BookieId> ensemble = lh.getLedgerMetadata().getAllEnsembles().get(startEntryId);
// Get bookies to replace
Map<Integer, BookieId> targetBookieAddresses;
try {
targetBookieAddresses = getReplacementBookies(lh, ensemble, bookiesSrc);
} catch (BKException.BKNotEnoughBookiesException e) {
if (!dryrun) {
ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException, null, null);
} else {
VERBOSE.info(" Fragment [{} - {}] : {}", startEntryId, endEntryId,
BKException.getMessage(BKException.Code.NotEnoughBookiesException));
}
continue;
}
if (dryrun) {
ArrayList<BookieId> newEnsemble =
replaceBookiesInEnsemble(ensemble, targetBookieAddresses);
VERBOSE.info(" Fragment [{} - {}] : ", startEntryId, endEntryId);
VERBOSE.info(" old ensemble : {}", formatEnsemble(ensemble, bookiesSrc, '*'));
VERBOSE.info(" new ensemble : {}", formatEnsemble(newEnsemble, bookiesSrc, '*'));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Replicating fragment from [{}, {}] of ledger {} to {}",
startEntryId, endEntryId, lh.getId(), targetBookieAddresses);
}
try {
LedgerFragmentReplicator.SingleFragmentCallback cb =
new LedgerFragmentReplicator.SingleFragmentCallback(ledgerFragmentsMcb, lh,
bkc.getLedgerManager(),
startEntryId, getReplacementBookiesMap(ensemble, targetBookieAddresses));
LedgerFragment ledgerFragment = new LedgerFragment(lh,
startEntryId, endEntryId, targetBookieAddresses.keySet());
asyncRecoverLedgerFragment(lh, ledgerFragment, cb,
Sets.newHashSet(targetBookieAddresses.values()), NOOP_BICONSUMER);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
if (dryrun) {
ledgerIterCb.processResult(BKException.Code.OK, null, null);
}
}
}, null);
}