public void connect()

in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java [182:319]


    public void connect(final JmsConnectionInfo connectionInfo) throws ProviderException {
        checkClosedOrFailed();

        if (serializer != null) {
            throw new IllegalStateException("Connect cannot be called more than once");
        }

        final ProviderFuture connectRequest = futureFactory.createFuture();

        // Configure Transport prior to initialization at which point configuration is set and
        // cannot be updated.  All further interaction should take place on the serializer for
        // thread safety.

        ThreadFactory transportThreadFactory = new QpidJMSThreadFactory(
                "AmqpProvider :(" + PROVIDER_SEQUENCE.incrementAndGet() + "):[" +
                remoteURI.getScheme() + "://" + remoteURI.getHost() + ":" + remoteURI.getPort() + "]", true);

        transport.setThreadFactory(transportThreadFactory);
        transport.setTransportListener(AmqpProvider.this);
        transport.setMaxFrameSize(maxFrameSize);

        final SSLContext sslContextOverride;
        if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.SSL_CONTEXT)) {
            sslContextOverride =
                (SSLContext) connectionInfo.getExtensionMap().get(
                    JmsConnectionExtensions.SSL_CONTEXT).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
        } else {
            sslContextOverride = null;
        }

        if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER)) {
            Supplier<?> proxyHandlerSupplier = (Supplier<?>) connectionInfo.getExtensionMap().get(
                    JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
            if (proxyHandlerSupplier != null) {
                transport.getTransportOptions().setProxyHandlerSupplier(proxyHandlerSupplier);
            }
        }

        if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE)) {
            @SuppressWarnings({ "unchecked" })
            Map<String, String> headers = (Map<String, String>)
                connectionInfo.getExtensionMap().get(
                    JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
            if (headers != null) {
                transport.getTransportOptions().getHttpHeaders().putAll(headers);
            }
        }

        try {
            serializer = transport.connect(() -> {
                this.connectionInfo = connectionInfo;
                this.connectionRequest = connectRequest;

                protonTransport.setEmitFlowEventOnSend(false);

                try {
                    ((TransportInternal) protonTransport).setUseReadOnlyOutputBuffer(false);
                } catch (NoSuchMethodError nsme) {
                    // using a version at runtime where the optimisation isn't available, ignore
                    LOG.trace("Proton output buffer optimisation unavailable");
                }

                if (getMaxFrameSize() > 0) {
                    protonTransport.setMaxFrameSize(getMaxFrameSize());
                    protonTransport.setOutboundFrameSizeLimit(getMaxFrameSize());
                }

                protonTransport.setChannelMax(getChannelMax());
                protonTransport.setIdleTimeout(idleTimeout);
                protonTransport.bind(protonConnection);
                protonConnection.collect(protonCollector);

                if (saslLayer) {
                    Sasl sasl = protonTransport.sasl();
                    sasl.client();

                    String hostname = getVhost();
                    if (hostname == null) {
                        hostname = remoteURI.getHost();
                    } else if (hostname.isEmpty()) {
                        hostname = null;
                    }

                    sasl.setRemoteHostname(hostname);
                    sasl.setListener(new SaslListener() {

                        @Override
                        public void onSaslMechanisms(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
                            authenticator.handleSaslMechanisms(sasl, transport);
                            checkSaslAuthenticationState();
                        }

                        @Override
                        public void onSaslChallenge(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
                            authenticator.handleSaslChallenge(sasl, transport);
                            checkSaslAuthenticationState();
                        }

                        @Override
                        public void onSaslOutcome(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
                            authenticator.handleSaslOutcome(sasl, transport);
                            checkSaslAuthenticationState();
                        }

                        @Override
                        public void onSaslInit(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
                            // Server only event
                        }

                        @Override
                        public void onSaslResponse(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
                            // Server only event
                        }
                    });

                    authenticator = new AmqpSaslAuthenticator((remoteMechanisms) -> findSaslMechanism(remoteMechanisms));
                }
            }, sslContextOverride);

            // Once connected pump the transport to write the header and respond to any
            // data that arrived at connect such as pipelined Header etc
            serializer.execute(() -> pumpToProtonTransport());

            if (!saslLayer) {
                connectRequest.onSuccess();
            }
        } catch (Throwable t) {
            connectRequest.onFailure(ProviderExceptionSupport.createOrPassthroughFatal(t));
        }

        if (connectionInfo.getConnectTimeout() != JmsConnectionInfo.INFINITE) {
            if (!connectRequest.sync(connectionInfo.getConnectTimeout(), TimeUnit.MILLISECONDS)) {
                throw new ProviderOperationTimedOutException("Timed out while waiting to connect");
            }
        } else {
            connectRequest.sync();
        }
    }