in bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java [263:387]
public synchronized void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
int bookieIndex = (Integer) ctx;
--pendingWriteRequests;
if (!ensemble.get(bookieIndex).equals(addr)) {
// ensemble has already changed, failure of this addr is immaterial
if (LOG.isDebugEnabled()) {
LOG.debug("Write did not succeed: " + ledgerId + ", " + entryId + ". But we have already fixed it.");
}
return;
}
// must record all acks, even if complete (completion can be undone by an ensemble change)
boolean ackQuorum = false;
if (BKException.Code.OK == rc) {
ackQuorum = ackSet.completeBookieAndCheck(bookieIndex);
addEntrySuccessBookies.add(ensemble.get(bookieIndex));
}
if (completed) {
if (rc != BKException.Code.OK) {
// Got an error after satisfying AQ. This means we are under replicated at the create itself.
// Update the stat to reflect it.
clientCtx.getClientStats().getAddOpUrCounter().inc();
if (!clientCtx.getConf().disableEnsembleChangeFeature.isAvailable()
&& !clientCtx.getConf().delayEnsembleChange) {
lh.notifyWriteFailed(bookieIndex, addr);
}
}
// even the add operation is completed, but because we don't reset completed flag back to false when
// #unsetSuccessAndSendWriteRequest doesn't break ack quorum constraint. we still have current pending
// add op is completed but never callback. so do a check here to complete again.
//
// E.g. entry x is going to complete.
//
// 1) entry x + k hits a failure. lh.handleBookieFailure increases blockAddCompletions to 1, for ensemble
// change
// 2) entry x receives all responses, sets completed to true but fails to send success callback because
// blockAddCompletions is 1
// 3) ensemble change completed. lh unset success starting from x to x+k, but since the unset doesn't break
// ackSet constraint. #removeBookieAndCheck doesn't set completed back to false.
// 4) so when the retry request on new bookie completes, it finds the pending op is already completed.
// we have to trigger #sendAddSuccessCallbacks
//
sendAddSuccessCallbacks();
// I am already finished, ignore incoming responses.
// otherwise, we might hit the following error handling logic, which might cause bad things.
maybeRecycle();
return;
}
switch (rc) {
case BKException.Code.OK:
// continue
break;
case BKException.Code.ClientClosedException:
// bookie client is closed.
lh.errorOutPendingAdds(rc);
return;
case BKException.Code.IllegalOpException:
// illegal operation requested, like using unsupported feature in v2 protocol
lh.handleUnrecoverableErrorDuringAdd(rc);
return;
case BKException.Code.LedgerFencedException:
LOG.warn("Fencing exception on write: L{} E{} on {}",
ledgerId, entryId, addr);
lh.handleUnrecoverableErrorDuringAdd(rc);
return;
case BKException.Code.UnauthorizedAccessException:
LOG.warn("Unauthorized access exception on write: L{} E{} on {}",
ledgerId, entryId, addr);
lh.handleUnrecoverableErrorDuringAdd(rc);
return;
default:
if (clientCtx.getConf().delayEnsembleChange) {
if (ackSet.failBookieAndCheck(bookieIndex, addr)
|| rc == BKException.Code.WriteOnReadOnlyBookieException) {
Map<Integer, BookieId> failedBookies = ackSet.getFailedBookies();
LOG.warn("Failed to write entry ({}, {}) to bookies {}, handling failures.",
ledgerId, entryId, failedBookies);
// we can't meet ack quorum requirement, trigger ensemble change.
lh.handleBookieFailure(failedBookies);
} else if (LOG.isDebugEnabled()) {
LOG.debug("Failed to write entry ({}, {}) to bookie ({}, {}),"
+ " but it didn't break ack quorum, delaying ensemble change : {}",
ledgerId, entryId, bookieIndex, addr, BKException.getMessage(rc));
}
} else {
LOG.warn("Failed to write entry ({}, {}) to bookie ({}, {}): {}",
ledgerId, entryId, bookieIndex, addr, BKException.getMessage(rc));
lh.handleBookieFailure(ImmutableMap.of(bookieIndex, addr));
}
return;
}
if (ackQuorum && !completed) {
if (clientCtx.getConf().enforceMinNumFaultDomainsForWrite
&& !(clientCtx.getPlacementPolicy()
.areAckedBookiesAdheringToPlacementPolicy(addEntrySuccessBookies,
lh.getLedgerMetadata().getWriteQuorumSize(),
lh.getLedgerMetadata().getAckQuorumSize()))) {
LOG.warn("Write success for entry ID {} delayed, not acknowledged by bookies in enough fault domains",
entryId);
// Increment to indicate write did not complete due to not enough fault domains
clientCtx.getClientStats().getWriteDelayedDueToNotEnoughFaultDomains().inc();
// Only do this for the first time.
if (writeDelayedStartTime == -1) {
writeDelayedStartTime = MathUtils.nowInNano();
}
} else {
completed = true;
this.qwcLatency = MathUtils.elapsedNanos(requestTimeNanos);
if (writeDelayedStartTime != -1) {
clientCtx.getClientStats()
.getWriteDelayedDueToNotEnoughFaultDomainsLatency()
.registerSuccessfulEvent(MathUtils.elapsedNanos(writeDelayedStartTime),
TimeUnit.NANOSECONDS);
}
sendAddSuccessCallbacks();
}
}
}