void ensembleChangeLoop()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java [2202:2298]


    void ensembleChangeLoop(List<BookieId> origEnsemble, Map<Integer, BookieId> failedBookies) {
        int ensembleChangeId = numEnsembleChanges.incrementAndGet();
        ensembleChangeCounter.inc();
        String logContext = String.format("[EnsembleChange(ledger:%d, change-id:%010d)]", ledgerId, ensembleChangeId);

        // when the ensemble changes are too frequent, close handle
        if (ensembleChangeId > clientCtx.getConf().maxAllowedEnsembleChanges) {
            LOG.info("{} reaches max allowed ensemble change number {}",
                     logContext, clientCtx.getConf().maxAllowedEnsembleChanges);
            handleUnrecoverableErrorDuringAdd(WriteException);
            return;
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} Replacing {} in {}", logContext, failedBookies, origEnsemble);
        }

        AtomicInteger attempts = new AtomicInteger(0);
        new MetadataUpdateLoop(
                clientCtx.getLedgerManager(), getId(),
                this::getVersionedLedgerMetadata,
                (metadata) -> metadata.getState() == LedgerMetadata.State.OPEN
                        && failedBookies.entrySet().stream().anyMatch(
                                e -> LedgerMetadataUtils.getLastEnsembleValue(metadata)
                                             .get(e.getKey()).equals(e.getValue())),
                (metadata) -> {
                    attempts.incrementAndGet();

                    List<BookieId> currentEnsemble = getCurrentEnsemble();
                    List<BookieId> newEnsemble = EnsembleUtils.replaceBookiesInEnsemble(
                            clientCtx.getBookieWatcher(), metadata, currentEnsemble, failedBookies, logContext);
                    Long lastEnsembleKey = LedgerMetadataUtils.getLastEnsembleKey(metadata);
                    LedgerMetadataBuilder builder = LedgerMetadataBuilder.from(metadata);
                    long newEnsembleStartEntry = getLastAddConfirmed() + 1;
                    checkState(lastEnsembleKey <= newEnsembleStartEntry,
                               "New ensemble must either replace the last ensemble, or add a new one");
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{}[attempt:{}] changing ensemble from: {} to: {} starting at entry: {}",
                                  logContext, attempts.get(), currentEnsemble, newEnsemble, newEnsembleStartEntry);
                    }

                    if (lastEnsembleKey.equals(newEnsembleStartEntry)) {
                        return builder.replaceEnsembleEntry(newEnsembleStartEntry, newEnsemble).build();
                    } else {
                        return builder.newEnsembleEntry(newEnsembleStartEntry, newEnsemble).build();
                    }
                },
                this::setLedgerMetadata)
            .run().whenCompleteAsync((metadata, ex) -> {
                    if (ex != null) {
                        LOG.warn("{}[attempt:{}] Exception changing ensemble", logContext, attempts.get(), ex);
                        handleUnrecoverableErrorDuringAdd(BKException.getExceptionCode(ex, WriteException));
                    } else if (metadata.getValue().isClosed()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{}[attempt:{}] Metadata closed during attempt to replace bookie."
                                      + " Another client must have recovered the ledger.", logContext, attempts.get());
                        }
                        handleUnrecoverableErrorDuringAdd(BKException.Code.LedgerClosedException);
                    } else if (metadata.getValue().getState() == LedgerMetadata.State.IN_RECOVERY) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{}[attempt:{}] Metadata marked as in-recovery during attempt to replace bookie."
                                      + " Another client must be recovering the ledger.", logContext, attempts.get());
                        }

                        handleUnrecoverableErrorDuringAdd(BKException.Code.LedgerFencedException);
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("{}[attempt:{}] Success updating metadata.", logContext, attempts.get());
                        }

                        List<BookieId> newEnsemble = null;
                        Set<Integer> replaced = null;

                        Map<Integer, BookieId> toReplace = null;
                        synchronized (metadataLock) {
                            if (!delayedWriteFailedBookies.isEmpty()) {
                                toReplace = new HashMap<>(delayedWriteFailedBookies);
                                delayedWriteFailedBookies.clear();
                            } else {
                                newEnsemble = getCurrentEnsemble();
                                replaced = EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble);
                                LOG.info("New Ensemble: {} for ledger: {}", newEnsemble, ledgerId);

                                changingEnsemble = false;
                            }
                        }

                        if (toReplace != null && !toReplace.isEmpty()) {
                            ensembleChangeLoop(origEnsemble, toReplace);
                        }

                        if (newEnsemble != null) { // unsetSuccess outside of lock
                            unsetSuccessAndSendWriteRequest(newEnsemble, replaced);
                        }
                    }
            }, clientCtx.getMainWorkerPool().chooseThread(ledgerId));
    }