public synchronized void writeComplete()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java [263:387]


    public synchronized void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
        int bookieIndex = (Integer) ctx;
        --pendingWriteRequests;

        if (!ensemble.get(bookieIndex).equals(addr)) {
            // ensemble has already changed, failure of this addr is immaterial
            if (LOG.isDebugEnabled()) {
                LOG.debug("Write did not succeed: " + ledgerId + ", " + entryId + ". But we have already fixed it.");
            }
            return;
        }

        // must record all acks, even if complete (completion can be undone by an ensemble change)
        boolean ackQuorum = false;
        if (BKException.Code.OK == rc) {
            ackQuorum = ackSet.completeBookieAndCheck(bookieIndex);
            addEntrySuccessBookies.add(ensemble.get(bookieIndex));
        }

        if (completed) {
            if (rc != BKException.Code.OK) {
                // Got an error after satisfying AQ. This means we are under replicated at the create itself.
                // Update the stat to reflect it.
                clientCtx.getClientStats().getAddOpUrCounter().inc();
                if (!clientCtx.getConf().disableEnsembleChangeFeature.isAvailable()
                        && !clientCtx.getConf().delayEnsembleChange) {
                    lh.notifyWriteFailed(bookieIndex, addr);
                }
            }
            // even the add operation is completed, but because we don't reset completed flag back to false when
            // #unsetSuccessAndSendWriteRequest doesn't break ack quorum constraint. we still have current pending
            // add op is completed but never callback. so do a check here to complete again.
            //
            // E.g. entry x is going to complete.
            //
            // 1) entry x + k hits a failure. lh.handleBookieFailure increases blockAddCompletions to 1, for ensemble
            //    change
            // 2) entry x receives all responses, sets completed to true but fails to send success callback because
            //    blockAddCompletions is 1
            // 3) ensemble change completed. lh unset success starting from x to x+k, but since the unset doesn't break
            //    ackSet constraint. #removeBookieAndCheck doesn't set completed back to false.
            // 4) so when the retry request on new bookie completes, it finds the pending op is already completed.
            //    we have to trigger #sendAddSuccessCallbacks
            //
            sendAddSuccessCallbacks();
            // I am already finished, ignore incoming responses.
            // otherwise, we might hit the following error handling logic, which might cause bad things.
            maybeRecycle();
            return;
        }

        switch (rc) {
        case BKException.Code.OK:
            // continue
            break;
        case BKException.Code.ClientClosedException:
            // bookie client is closed.
            lh.errorOutPendingAdds(rc);
            return;
        case BKException.Code.IllegalOpException:
            // illegal operation requested, like using unsupported feature in v2 protocol
            lh.handleUnrecoverableErrorDuringAdd(rc);
            return;
        case BKException.Code.LedgerFencedException:
            LOG.warn("Fencing exception on write: L{} E{} on {}",
                    ledgerId, entryId, addr);
            lh.handleUnrecoverableErrorDuringAdd(rc);
            return;
        case BKException.Code.UnauthorizedAccessException:
            LOG.warn("Unauthorized access exception on write: L{} E{} on {}",
                    ledgerId, entryId, addr);
            lh.handleUnrecoverableErrorDuringAdd(rc);
            return;
        default:
            if (clientCtx.getConf().delayEnsembleChange) {
                if (ackSet.failBookieAndCheck(bookieIndex, addr)
                        || rc == BKException.Code.WriteOnReadOnlyBookieException) {
                    Map<Integer, BookieId> failedBookies = ackSet.getFailedBookies();
                    LOG.warn("Failed to write entry ({}, {}) to bookies {}, handling failures.",
                            ledgerId, entryId, failedBookies);
                    // we can't meet ack quorum requirement, trigger ensemble change.
                    lh.handleBookieFailure(failedBookies);
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug("Failed to write entry ({}, {}) to bookie ({}, {}),"
                                    + " but it didn't break ack quorum, delaying ensemble change : {}",
                            ledgerId, entryId, bookieIndex, addr, BKException.getMessage(rc));
                }
            } else {
                LOG.warn("Failed to write entry ({}, {}) to bookie ({}, {}): {}",
                        ledgerId, entryId, bookieIndex, addr, BKException.getMessage(rc));
                lh.handleBookieFailure(ImmutableMap.of(bookieIndex, addr));
            }
            return;
        }

        if (ackQuorum && !completed) {
            if (clientCtx.getConf().enforceMinNumFaultDomainsForWrite
                && !(clientCtx.getPlacementPolicy()
                              .areAckedBookiesAdheringToPlacementPolicy(addEntrySuccessBookies,
                                                                        lh.getLedgerMetadata().getWriteQuorumSize(),
                                                                        lh.getLedgerMetadata().getAckQuorumSize()))) {
                LOG.warn("Write success for entry ID {} delayed, not acknowledged by bookies in enough fault domains",
                         entryId);
                // Increment to indicate write did not complete due to not enough fault domains
                clientCtx.getClientStats().getWriteDelayedDueToNotEnoughFaultDomains().inc();

                // Only do this for the first time.
                if (writeDelayedStartTime == -1) {
                    writeDelayedStartTime = MathUtils.nowInNano();
                }
            } else {
                completed = true;
                this.qwcLatency = MathUtils.elapsedNanos(requestTimeNanos);

                if (writeDelayedStartTime != -1) {
                    clientCtx.getClientStats()
                             .getWriteDelayedDueToNotEnoughFaultDomainsLatency()
                             .registerSuccessfulEvent(MathUtils.elapsedNanos(writeDelayedStartTime),
                                                      TimeUnit.NANOSECONDS);
                }

                sendAddSuccessCallbacks();
            }
        }
    }