private void recoverLedger()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java [807:1011]


    private void recoverLedger(final Set<BookieId> bookiesSrc, final long lId, final boolean dryrun,
                               final boolean skipOpenLedgers, final boolean skipUnrecoverableLedgers,
                               final AsyncCallback.VoidCallback finalLedgerIterCb) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Recovering ledger : {}", lId);
        }

        asyncOpenLedgerNoRecovery(lId, new OpenCallback() {
            @Override
            public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
                if (rc != BKException.Code.OK) {
                    if (skipUnrecoverableLedgers) {
                        LOG.warn("BK error opening ledger: {}, skip recover it.", lId, BKException.create(rc));
                        finalLedgerIterCb.processResult(BKException.Code.OK, null, null);
                    } else {
                        LOG.error("BK error opening ledger: {}", lId, BKException.create(rc));
                        finalLedgerIterCb.processResult(rc, null, null);
                    }
                    return;
                }

                LedgerMetadata lm = lh.getLedgerMetadata();
                if (skipOpenLedgers && lm.getState() == LedgerMetadata.State.OPEN) {
                    LOG.info("Skip recovering open ledger {}.", lId);
                    try {
                        lh.close();
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    } catch (BKException bke) {
                        LOG.warn("Error on closing ledger handle for {}.", lId);
                    }
                    finalLedgerIterCb.processResult(BKException.Code.OK, null, null);
                    return;
                }

                final boolean fenceRequired = !lm.isClosed() && containBookiesInLastEnsemble(lm, bookiesSrc);
                // the original write has not removed faulty bookie from
                // current ledger ensemble. to avoid data loss issue in
                // the case of concurrent updates to the ensemble composition,
                // the recovery tool should first close the ledger
                if (!dryrun && fenceRequired) {
                    // close opened non recovery ledger handle
                    try {
                        lh.close();
                    } catch (Exception ie) {
                        LOG.warn("Error closing non recovery ledger handle for ledger " + lId, ie);
                    }
                    asyncOpenLedger(lId, new OpenCallback() {
                        @Override
                        public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
                            if (newrc != BKException.Code.OK) {
                                if (skipUnrecoverableLedgers) {
                                    LOG.warn("BK error opening ledger: {}, skip recover it.",
                                        lId, BKException.create(newrc));
                                    finalLedgerIterCb.processResult(BKException.Code.OK, null, null);
                                } else {
                                    LOG.error("BK error close ledger: {}", lId, BKException.create(newrc));
                                    finalLedgerIterCb.processResult(newrc, null, null);
                                }
                                return;
                            }
                            bkc.mainWorkerPool.submit(() -> {
                                // do recovery
                                recoverLedger(bookiesSrc, lId, dryrun, skipOpenLedgers,
                                    skipUnrecoverableLedgers, finalLedgerIterCb);
                            });
                        }
                    }, null);
                    return;
                }

                final AsyncCallback.VoidCallback ledgerIterCb = new AsyncCallback.VoidCallback() {
                    @Override
                    public void processResult(int rc, String path, Object ctx) {
                        if (BKException.Code.OK != rc) {
                            if (skipUnrecoverableLedgers) {
                                LOG.warn("Failed to recover ledger: {} : {}, skip recover it.", lId,
                                    BKException.codeLogger(rc));
                                rc = BKException.Code.OK;
                            } else {
                                LOG.error("Failed to recover ledger {} : {}", lId, BKException.codeLogger(rc));
                            }
                        } else {
                            LOG.info("Recovered ledger {}.", lId);
                        }
                        try {
                            lh.close();
                        } catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                        } catch (BKException bke) {
                            LOG.warn("Error on closing ledger handle for {}.", lId);
                        }
                        finalLedgerIterCb.processResult(rc, path, ctx);
                    }
                };

                /*
                 * This List stores the ledger fragments to recover indexed by
                 * the start entry ID for the range. The ensembles TreeMap is
                 * keyed off this.
                 */
                final List<Long> ledgerFragmentsToRecover = new LinkedList<Long>();
                /*
                 * This Map will store the start and end entry ID values for
                 * each of the ledger fragment ranges. The only exception is the
                 * current active fragment since it has no end yet. In the event
                 * of a bookie failure, a new ensemble is created so the current
                 * ensemble should not contain the dead bookie we are trying to
                 * recover.
                 */
                Map<Long, Long> ledgerFragmentsRange = new HashMap<Long, Long>();
                Long curEntryId = null;
                for (Map.Entry<Long, ? extends List<BookieId>> entry :
                         lh.getLedgerMetadata().getAllEnsembles().entrySet()) {
                    if (curEntryId != null) {
                        ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1);
                    }
                    curEntryId = entry.getKey();
                    if (containBookies(entry.getValue(), bookiesSrc)) {
                        /*
                         * Current ledger fragment has entries stored on the
                         * dead bookie so we'll need to recover them.
                         */
                        ledgerFragmentsToRecover.add(entry.getKey());
                    }
                }
                // add last ensemble otherwise if the failed bookie existed in
                // the last ensemble of a closed ledger. the entries belonged to
                // last ensemble would not be replicated.
                if (curEntryId != null) {
                    ledgerFragmentsRange.put(curEntryId, lh.getLastAddConfirmed());
                }
                /*
                 * See if this current ledger contains any ledger fragment that
                 * needs to be re-replicated. If not, then just invoke the
                 * multiCallback and return.
                 */
                if (ledgerFragmentsToRecover.size() == 0) {
                    ledgerIterCb.processResult(BKException.Code.OK, null, null);
                    return;
                }

                if (dryrun) {
                    VERBOSE.info("Recovered ledger {} : {}", lId, (fenceRequired ? "[fence required]" : ""));
                }

                /*
                 * Multicallback for ledger. Once all fragments for the ledger have been recovered
                 * trigger the ledgerIterCb
                 */
                MultiCallback ledgerFragmentsMcb = new MultiCallback(ledgerFragmentsToRecover.size(), ledgerIterCb,
                        null, BKException.Code.OK, BKException.Code.LedgerRecoveryException);
                /*
                 * Now recover all of the necessary ledger fragments
                 * asynchronously using a MultiCallback for every fragment.
                 */
                for (final Long startEntryId : ledgerFragmentsToRecover) {
                    Long endEntryId = ledgerFragmentsRange.get(startEntryId);
                    List<BookieId> ensemble = lh.getLedgerMetadata().getAllEnsembles().get(startEntryId);
                    // Get bookies to replace
                    Map<Integer, BookieId> targetBookieAddresses;
                    try {
                        targetBookieAddresses = getReplacementBookies(lh, ensemble, bookiesSrc);
                    } catch (BKException.BKNotEnoughBookiesException e) {
                        if (!dryrun) {
                            ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException, null, null);
                        } else {
                            VERBOSE.info("  Fragment [{} - {}] : {}", startEntryId, endEntryId,
                                BKException.getMessage(BKException.Code.NotEnoughBookiesException));
                        }
                        continue;
                    }

                    if (dryrun) {
                        ArrayList<BookieId> newEnsemble =
                                replaceBookiesInEnsemble(ensemble, targetBookieAddresses);
                        VERBOSE.info("  Fragment [{} - {}] : ", startEntryId, endEntryId);
                        VERBOSE.info("    old ensemble : {}", formatEnsemble(ensemble, bookiesSrc, '*'));
                        VERBOSE.info("    new ensemble : {}", formatEnsemble(newEnsemble, bookiesSrc, '*'));
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Replicating fragment from [{}, {}] of ledger {} to {}",
                                startEntryId, endEntryId, lh.getId(), targetBookieAddresses);
                        }
                        try {
                            LedgerFragmentReplicator.SingleFragmentCallback cb =
                                new LedgerFragmentReplicator.SingleFragmentCallback(ledgerFragmentsMcb, lh,
                                                                                    bkc.getLedgerManager(),
                                        startEntryId, getReplacementBookiesMap(ensemble, targetBookieAddresses));
                            LedgerFragment ledgerFragment = new LedgerFragment(lh,
                                startEntryId, endEntryId, targetBookieAddresses.keySet());
                            asyncRecoverLedgerFragment(lh, ledgerFragment, cb,
                                Sets.newHashSet(targetBookieAddresses.values()), NOOP_BICONSUMER);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                }
                if (dryrun) {
                    ledgerIterCb.processResult(BKException.Code.OK, null, null);
                }
            }
            }, null);
    }