public ScheduledExecutorService connect()

in qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java [127:207]


    public ScheduledExecutorService connect(final Runnable initRoutine, SSLContext sslContextOverride) throws IOException {
        if (closed.get()) {
            throw new IllegalStateException("Transport has already been closed");
        }

        if (listener == null) {
            throw new IllegalStateException("A transport listener must be set before connection attempts.");
        }

        TransportOptions transportOptions = getTransportOptions();

        EventLoopType eventLoopType = EventLoopType.valueOf(transportOptions);
        int sharedEventLoopThreads = transportOptions.getSharedEventLoopThreads();
        if (sharedEventLoopThreads > 0) {
            groupRef = sharedGroup(eventLoopType, sharedEventLoopThreads);
        } else {
            groupRef = unsharedGroup(eventLoopType, ioThreadfactory);
        }

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(groupRef.group());

        eventLoopType.createChannel(bootstrap);

        bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel connectedChannel) throws Exception {
                if (initRoutine != null) {
                    try {
                        initRoutine.run();
                    } catch (Throwable initError) {
                        LOG.warn("Error during initialization of channel from provided initialization routine");
                        connectionFailed(connectedChannel, IOExceptionSupport.create(initError));
                        throw initError;
                    }
                }
                configureChannel(connectedChannel);
            }
        });

        configureNetty(bootstrap, transportOptions);
        transportOptions.setSslContextOverride(sslContextOverride);

        ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
        future.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    handleException(future.channel(), IOExceptionSupport.create(future.cause()));
                }
            }
        });

        try {
            connectLatch.await();
        } catch (InterruptedException ex) {
            LOG.debug("Transport connection was interrupted.");
            Thread.interrupted();
            failureCause = IOExceptionSupport.create(ex);
        }

        if (failureCause != null) {
            // Close out any Netty resources now as they are no longer needed.
            if (channel != null) {
                channel.close().syncUninterruptibly();
                channel = null;
            }

            throw failureCause;
        } else {
            // Connected, allow any held async error to fire now and close the transport.
            channel.eventLoop().execute(() -> {
                if (failureCause != null) {
                    channel.pipeline().fireExceptionCaught(failureCause);
                }
            });
        }
        // returning the channel's specific event loop: the overall event loop group may be multi-threaded
        return channel.eventLoop();
    }