in bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java [2601:2692]
public void operationComplete(ChannelFuture future) {
if (LOG.isDebugEnabled()) {
LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.channel());
}
int rc;
Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
/* We fill in the timer based on whether the connect operation itself succeeded regardless of
* whether there was a race */
if (future.isSuccess()) {
PerChannelBookieClient.this
.connectTimer.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
} else {
PerChannelBookieClient.this
.connectTimer.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
synchronized (PerChannelBookieClient.this) {
if (future.isSuccess() && state == ConnectionState.CONNECTING && future.channel().isActive()) {
rc = BKException.Code.OK;
channel = future.channel();
if (shFactory != null) {
LOG.info("Successfully connected to bookie: {} {} initiate TLS", bookieId, future.channel());
makeWritable();
initiateTLS();
return;
} else {
LOG.info("Successfully connected to bookie: {} {}", bookieId, future.channel());
state = ConnectionState.CONNECTED;
activeNonTlsChannelCounter.inc();
}
} else if (future.isSuccess() && state == ConnectionState.START_TLS) {
rc = BKException.Code.OK;
LOG.info("Successfully connected to bookie using TLS: " + bookieId);
state = ConnectionState.CONNECTED;
AuthHandler.ClientSideHandler authHandler = future.channel().pipeline()
.get(AuthHandler.ClientSideHandler.class);
authHandler.authProvider.onProtocolUpgrade();
activeTlsChannelCounter.inc();
} else if (future.isSuccess() && (state == ConnectionState.CLOSED
|| state == ConnectionState.DISCONNECTED)) {
LOG.warn("Closed before connection completed, clean up: {}, current state {}",
future.channel(), state);
closeChannel(future.channel());
rc = BKException.Code.BookieHandleNotAvailableException;
channel = null;
} else if (future.isSuccess() && state == ConnectionState.CONNECTED) {
if (LOG.isDebugEnabled()) {
LOG.debug("Already connected with another channel({}), so close the new channel({})", channel,
future.channel());
}
closeChannel(future.channel());
return; // pendingOps should have been completed when other channel connected
} else {
Throwable cause = future.cause();
if (cause instanceof UnknownHostException || cause instanceof NativeIoException) {
// Don't log stack trace for common errors
logBookieUnavailable(() -> LOG.warn("Could not connect to bookie: {}/{}, current state {} : {}",
future.channel(), bookieId, state, future.cause().getMessage()));
} else {
// Regular exceptions, include stack trace
logBookieUnavailable(() -> LOG.error("Could not connect to bookie: {}/{}, current state {} : ",
future.channel(), bookieId, state, future.cause()));
}
rc = BKException.Code.BookieHandleNotAvailableException;
Channel failedChannel = future.channel();
if (failedChannel != null) { // can be null in case of dummy failed ChannelFuture
closeChannel(failedChannel);
}
channel = null;
if (state != ConnectionState.CLOSED) {
state = ConnectionState.DISCONNECTED;
}
failedConnectionCounter.inc();
}
// trick to not do operations under the lock, take the list
// of pending ops and assign it to a new variable, while
// emptying the pending ops by just assigning it to a new
// list
oldPendingOps = pendingOps;
pendingOps = new ArrayDeque<>();
}
for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
pendingOp.operationComplete(rc, PerChannelBookieClient.this);
}
makeWritable();
}