public PerChannelBookieClient()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java [381:515]


    public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
                                  EventLoopGroup eventLoopGroup,
                                  ByteBufAllocator allocator,
                                  BookieId bookieId,
                                  StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
                                  ExtensionRegistry extRegistry,
                                  PerChannelBookieClientPool pcbcPool,
                                  SecurityHandlerFactory shFactory,
                                  BookieAddressResolver bookieAddressResolver) throws SecurityException {
        this.maxFrameSize = conf.getNettyMaxFrameSizeBytes();
        this.conf = conf;
        this.bookieId = bookieId;
        this.bookieAddressResolver = bookieAddressResolver;
        this.executor = executor;
        if (LocalBookiesRegistry.isLocalBookie(bookieId)) {
            this.eventLoopGroup = new DefaultEventLoopGroup();
        } else {
            this.eventLoopGroup = eventLoopGroup;
        }
        this.allocator = allocator;
        this.state = ConnectionState.DISCONNECTED;
        this.addEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout());
        this.readEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout());
        this.getBookieInfoTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getBookieInfoTimeout());
        this.startTLSTimeout = conf.getStartTLSTimeout();
        this.useV2WireProtocol = conf.getUseV2WireProtocol();
        this.preserveMdcForTaskExecution = conf.getPreserveMdcForTaskExecution();

        this.authProviderFactory = authProviderFactory;
        this.extRegistry = extRegistry;
        this.shFactory = shFactory;
        if (shFactory != null) {
            shFactory.init(NodeType.Client, conf, allocator);
        }

        this.statsLogger = parentStatsLogger.scope(BookKeeperClientStats.CHANNEL_SCOPE)
            .scopeLabel(BookKeeperClientStats.BOOKIE_LABEL, bookieId.toString());

        readEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_OP);
        addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP);
        writeLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_WRITE_LAC_OP);
        forceLedgerOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_FORCE_OP);
        readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP);
        getBookieInfoOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP);
        getListOfEntriesOfLedgerCompletionOpLogger = statsLogger
                .getOpStatsLogger(BookKeeperClientStats.GET_LIST_OF_ENTRIES_OF_LEDGER_OP);
        readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
        addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
        writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC);
        forceLedgerTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_FORCE);
        readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC);
        getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
        startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
        startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
        getListOfEntriesOfLedgerCompletionTimeoutOpLogger = statsLogger
                .getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_LIST_OF_ENTRIES_OF_LEDGER);
        exceptionCounter = statsLogger.getCounter(BookKeeperClientStats.NETTY_EXCEPTION_CNT);
        connectTimer = statsLogger.getOpStatsLogger(BookKeeperClientStats.CLIENT_CONNECT_TIMER);
        addEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.ADD_OP_OUTSTANDING);
        readEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.READ_OP_OUTSTANDING);
        nettyOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.NETTY_OPS);
        activeNonTlsChannelCounter = statsLogger.getCounter(BookKeeperClientStats.ACTIVE_NON_TLS_CHANNEL_COUNTER);
        activeTlsChannelCounter = statsLogger.getCounter(BookKeeperClientStats.ACTIVE_TLS_CHANNEL_COUNTER);
        failedConnectionCounter = statsLogger.getCounter(BookKeeperClientStats.FAILED_CONNECTION_COUNTER);
        failedTlsHandshakeCounter = statsLogger.getCounter(BookKeeperClientStats.FAILED_TLS_HANDSHAKE_COUNTER);

        this.pcbcPool = pcbcPool;

        this.connectionPeer = new ClientConnectionPeer() {

            @Override
            public SocketAddress getRemoteAddr() {
                Channel c = channel;
                if (c != null) {
                    return c.remoteAddress();
                } else {
                    return null;
                }
            }

            @Override
            public Collection<Object> getProtocolPrincipals() {
                Channel c = channel;
                if (c == null) {
                    return Collections.emptyList();
                }
                SslHandler ssl = c.pipeline().get(SslHandler.class);
                if (ssl == null) {
                    return Collections.emptyList();
                }
                try {
                    Certificate[] certificates = ssl.engine().getSession().getPeerCertificates();
                    if (certificates == null) {
                        return Collections.emptyList();
                    }
                    List<Object> result = new ArrayList<>();
                    result.addAll(Arrays.asList(certificates));
                    return result;
                } catch (SSLPeerUnverifiedException err) {
                     return Collections.emptyList();
                }
            }

            @Override
            public void disconnect() {
                Channel c = channel;
                if (c != null) {
                    c.close().addListener(x -> makeWritable());
                }
                LOG.info("authplugin disconnected channel {}", channel);
            }

            @Override
            public void setAuthorizedId(BookKeeperPrincipal principal) {
                authorizedId = principal;
                LOG.info("connection {} authenticated as {}", channel, principal);
            }

            @Override
            public BookKeeperPrincipal getAuthorizedId() {
                return authorizedId;
            }

            @Override
            public boolean isSecure() {
               Channel c = channel;
               if (c == null) {
                    return false;
               } else {
                    return c.pipeline().get(SslHandler.class) != null;
               }
            }

        };
    }