public void operationComplete()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java [2601:2692]


        public void operationComplete(ChannelFuture future) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.channel());
            }
            int rc;
            Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;

            /* We fill in the timer based on whether the connect operation itself succeeded regardless of
             * whether there was a race */
            if (future.isSuccess()) {
                PerChannelBookieClient.this
                .connectTimer.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
            } else {
                PerChannelBookieClient.this
                .connectTimer.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
            }

            synchronized (PerChannelBookieClient.this) {
                if (future.isSuccess() && state == ConnectionState.CONNECTING && future.channel().isActive()) {
                    rc = BKException.Code.OK;
                    channel = future.channel();
                    if (shFactory != null) {
                        LOG.info("Successfully connected to bookie: {} {} initiate TLS", bookieId, future.channel());
                        makeWritable();
                        initiateTLS();
                        return;
                    } else {
                        LOG.info("Successfully connected to bookie: {} {}", bookieId, future.channel());
                        state = ConnectionState.CONNECTED;
                        activeNonTlsChannelCounter.inc();
                    }
                } else if (future.isSuccess() && state == ConnectionState.START_TLS) {
                    rc = BKException.Code.OK;
                    LOG.info("Successfully connected to bookie using TLS: " + bookieId);

                    state = ConnectionState.CONNECTED;
                    AuthHandler.ClientSideHandler authHandler = future.channel().pipeline()
                            .get(AuthHandler.ClientSideHandler.class);
                    authHandler.authProvider.onProtocolUpgrade();
                    activeTlsChannelCounter.inc();
                } else if (future.isSuccess() && (state == ConnectionState.CLOSED
                    || state == ConnectionState.DISCONNECTED)) {
                    LOG.warn("Closed before connection completed, clean up: {}, current state {}",
                            future.channel(), state);
                    closeChannel(future.channel());
                    rc = BKException.Code.BookieHandleNotAvailableException;
                    channel = null;
                } else if (future.isSuccess() && state == ConnectionState.CONNECTED) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Already connected with another channel({}), so close the new channel({})", channel,
                                future.channel());
                    }
                    closeChannel(future.channel());
                    return; // pendingOps should have been completed when other channel connected
                } else {
                    Throwable cause = future.cause();
                    if (cause instanceof UnknownHostException || cause instanceof NativeIoException) {
                        // Don't log stack trace for common errors
                        logBookieUnavailable(() -> LOG.warn("Could not connect to bookie: {}/{}, current state {} : {}",
                                future.channel(), bookieId, state, future.cause().getMessage()));
                    } else {
                        // Regular exceptions, include stack trace
                        logBookieUnavailable(() -> LOG.error("Could not connect to bookie: {}/{}, current state {} : ",
                                future.channel(), bookieId, state, future.cause()));
                    }

                    rc = BKException.Code.BookieHandleNotAvailableException;
                    Channel failedChannel = future.channel();
                    if (failedChannel != null) { // can be null in case of dummy failed ChannelFuture
                        closeChannel(failedChannel);
                    }
                    channel = null;
                    if (state != ConnectionState.CLOSED) {
                        state = ConnectionState.DISCONNECTED;
                    }
                    failedConnectionCounter.inc();
                }

                // trick to not do operations under the lock, take the list
                // of pending ops and assign it to a new variable, while
                // emptying the pending ops by just assigning it to a new
                // list
                oldPendingOps = pendingOps;
                pendingOps = new ArrayDeque<>();
            }

            for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
                pendingOp.operationComplete(rc, PerChannelBookieClient.this);
            }

            makeWritable();
        }