in bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java [381:515]
public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
EventLoopGroup eventLoopGroup,
ByteBufAllocator allocator,
BookieId bookieId,
StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory,
BookieAddressResolver bookieAddressResolver) throws SecurityException {
this.maxFrameSize = conf.getNettyMaxFrameSizeBytes();
this.conf = conf;
this.bookieId = bookieId;
this.bookieAddressResolver = bookieAddressResolver;
this.executor = executor;
if (LocalBookiesRegistry.isLocalBookie(bookieId)) {
this.eventLoopGroup = new DefaultEventLoopGroup();
} else {
this.eventLoopGroup = eventLoopGroup;
}
this.allocator = allocator;
this.state = ConnectionState.DISCONNECTED;
this.addEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout());
this.readEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout());
this.getBookieInfoTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getBookieInfoTimeout());
this.startTLSTimeout = conf.getStartTLSTimeout();
this.useV2WireProtocol = conf.getUseV2WireProtocol();
this.preserveMdcForTaskExecution = conf.getPreserveMdcForTaskExecution();
this.authProviderFactory = authProviderFactory;
this.extRegistry = extRegistry;
this.shFactory = shFactory;
if (shFactory != null) {
shFactory.init(NodeType.Client, conf, allocator);
}
this.statsLogger = parentStatsLogger.scope(BookKeeperClientStats.CHANNEL_SCOPE)
.scopeLabel(BookKeeperClientStats.BOOKIE_LABEL, bookieId.toString());
readEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_OP);
addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP);
writeLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_WRITE_LAC_OP);
forceLedgerOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_FORCE_OP);
readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP);
getBookieInfoOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP);
getListOfEntriesOfLedgerCompletionOpLogger = statsLogger
.getOpStatsLogger(BookKeeperClientStats.GET_LIST_OF_ENTRIES_OF_LEDGER_OP);
readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC);
forceLedgerTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_FORCE);
readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC);
getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
getListOfEntriesOfLedgerCompletionTimeoutOpLogger = statsLogger
.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_LIST_OF_ENTRIES_OF_LEDGER);
exceptionCounter = statsLogger.getCounter(BookKeeperClientStats.NETTY_EXCEPTION_CNT);
connectTimer = statsLogger.getOpStatsLogger(BookKeeperClientStats.CLIENT_CONNECT_TIMER);
addEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.ADD_OP_OUTSTANDING);
readEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.READ_OP_OUTSTANDING);
nettyOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.NETTY_OPS);
activeNonTlsChannelCounter = statsLogger.getCounter(BookKeeperClientStats.ACTIVE_NON_TLS_CHANNEL_COUNTER);
activeTlsChannelCounter = statsLogger.getCounter(BookKeeperClientStats.ACTIVE_TLS_CHANNEL_COUNTER);
failedConnectionCounter = statsLogger.getCounter(BookKeeperClientStats.FAILED_CONNECTION_COUNTER);
failedTlsHandshakeCounter = statsLogger.getCounter(BookKeeperClientStats.FAILED_TLS_HANDSHAKE_COUNTER);
this.pcbcPool = pcbcPool;
this.connectionPeer = new ClientConnectionPeer() {
@Override
public SocketAddress getRemoteAddr() {
Channel c = channel;
if (c != null) {
return c.remoteAddress();
} else {
return null;
}
}
@Override
public Collection<Object> getProtocolPrincipals() {
Channel c = channel;
if (c == null) {
return Collections.emptyList();
}
SslHandler ssl = c.pipeline().get(SslHandler.class);
if (ssl == null) {
return Collections.emptyList();
}
try {
Certificate[] certificates = ssl.engine().getSession().getPeerCertificates();
if (certificates == null) {
return Collections.emptyList();
}
List<Object> result = new ArrayList<>();
result.addAll(Arrays.asList(certificates));
return result;
} catch (SSLPeerUnverifiedException err) {
return Collections.emptyList();
}
}
@Override
public void disconnect() {
Channel c = channel;
if (c != null) {
c.close().addListener(x -> makeWritable());
}
LOG.info("authplugin disconnected channel {}", channel);
}
@Override
public void setAuthorizedId(BookKeeperPrincipal principal) {
authorizedId = principal;
LOG.info("connection {} authenticated as {}", channel, principal);
}
@Override
public BookKeeperPrincipal getAuthorizedId() {
return authorizedId;
}
@Override
public boolean isSecure() {
Channel c = channel;
if (c == null) {
return false;
} else {
return c.pipeline().get(SslHandler.class) != null;
}
}
};
}