in bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java [2202:2298]
void ensembleChangeLoop(List<BookieId> origEnsemble, Map<Integer, BookieId> failedBookies) {
int ensembleChangeId = numEnsembleChanges.incrementAndGet();
ensembleChangeCounter.inc();
String logContext = String.format("[EnsembleChange(ledger:%d, change-id:%010d)]", ledgerId, ensembleChangeId);
// when the ensemble changes are too frequent, close handle
if (ensembleChangeId > clientCtx.getConf().maxAllowedEnsembleChanges) {
LOG.info("{} reaches max allowed ensemble change number {}",
logContext, clientCtx.getConf().maxAllowedEnsembleChanges);
handleUnrecoverableErrorDuringAdd(WriteException);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} Replacing {} in {}", logContext, failedBookies, origEnsemble);
}
AtomicInteger attempts = new AtomicInteger(0);
new MetadataUpdateLoop(
clientCtx.getLedgerManager(), getId(),
this::getVersionedLedgerMetadata,
(metadata) -> metadata.getState() == LedgerMetadata.State.OPEN
&& failedBookies.entrySet().stream().anyMatch(
e -> LedgerMetadataUtils.getLastEnsembleValue(metadata)
.get(e.getKey()).equals(e.getValue())),
(metadata) -> {
attempts.incrementAndGet();
List<BookieId> currentEnsemble = getCurrentEnsemble();
List<BookieId> newEnsemble = EnsembleUtils.replaceBookiesInEnsemble(
clientCtx.getBookieWatcher(), metadata, currentEnsemble, failedBookies, logContext);
Long lastEnsembleKey = LedgerMetadataUtils.getLastEnsembleKey(metadata);
LedgerMetadataBuilder builder = LedgerMetadataBuilder.from(metadata);
long newEnsembleStartEntry = getLastAddConfirmed() + 1;
checkState(lastEnsembleKey <= newEnsembleStartEntry,
"New ensemble must either replace the last ensemble, or add a new one");
if (LOG.isDebugEnabled()) {
LOG.debug("{}[attempt:{}] changing ensemble from: {} to: {} starting at entry: {}",
logContext, attempts.get(), currentEnsemble, newEnsemble, newEnsembleStartEntry);
}
if (lastEnsembleKey.equals(newEnsembleStartEntry)) {
return builder.replaceEnsembleEntry(newEnsembleStartEntry, newEnsemble).build();
} else {
return builder.newEnsembleEntry(newEnsembleStartEntry, newEnsemble).build();
}
},
this::setLedgerMetadata)
.run().whenCompleteAsync((metadata, ex) -> {
if (ex != null) {
LOG.warn("{}[attempt:{}] Exception changing ensemble", logContext, attempts.get(), ex);
handleUnrecoverableErrorDuringAdd(BKException.getExceptionCode(ex, WriteException));
} else if (metadata.getValue().isClosed()) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}[attempt:{}] Metadata closed during attempt to replace bookie."
+ " Another client must have recovered the ledger.", logContext, attempts.get());
}
handleUnrecoverableErrorDuringAdd(BKException.Code.LedgerClosedException);
} else if (metadata.getValue().getState() == LedgerMetadata.State.IN_RECOVERY) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}[attempt:{}] Metadata marked as in-recovery during attempt to replace bookie."
+ " Another client must be recovering the ledger.", logContext, attempts.get());
}
handleUnrecoverableErrorDuringAdd(BKException.Code.LedgerFencedException);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("{}[attempt:{}] Success updating metadata.", logContext, attempts.get());
}
List<BookieId> newEnsemble = null;
Set<Integer> replaced = null;
Map<Integer, BookieId> toReplace = null;
synchronized (metadataLock) {
if (!delayedWriteFailedBookies.isEmpty()) {
toReplace = new HashMap<>(delayedWriteFailedBookies);
delayedWriteFailedBookies.clear();
} else {
newEnsemble = getCurrentEnsemble();
replaced = EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble);
LOG.info("New Ensemble: {} for ledger: {}", newEnsemble, ledgerId);
changingEnsemble = false;
}
}
if (toReplace != null && !toReplace.isEmpty()) {
ensembleChangeLoop(origEnsemble, toReplace);
}
if (newEnsemble != null) { // unsetSuccess outside of lock
unsetSuccessAndSendWriteRequest(newEnsemble, replaced);
}
}
}, clientCtx.getMainWorkerPool().chooseThread(ledgerId));
}