public ProtocolVersion makeBrokerConnection()

in client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java [113:206]


    public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws QpidException
    {
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Connecting to broker:" + brokerDetail);
        }
        final Set<AMQState> openOrClosedStates =
                EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);

        ConnectionSettings settings = brokerDetail.buildConnectionSettings();

        //Check connection-level ssl override setting
        String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL);
        if(connectionSslOption != null)
        {
            boolean connUseSsl = Boolean.parseBoolean(connectionSslOption);
            boolean brokerlistUseSsl = settings.isUseSSL();

            if( connUseSsl != brokerlistUseSsl)
            {
                settings.setUseSSL(connUseSsl);

                if (_logger.isDebugEnabled())
                {
                    _logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl );
                }
            }
        }

        SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);

        IoNetworkTransport transport = new IoNetworkTransport();

        ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler()));

        NetworkConnection network = transport.connect(settings, monitoringReceiver,
                                                      _conn.getProtocolHandler());

        try
        {
            _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));

            StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
            _conn.getProtocolHandler().init(settings);

            // this blocks until the connection has been set up or when an error
            // has prevented the connection being set up

            AMQState state = waiter.await();

            if (state == AMQState.CONNECTION_OPEN)
            {
                _conn.getFailoverPolicy().attainedConnection();
                _conn.setConnected(true);
                _conn.logConnected(network.getLocalAddress(), network.getRemoteAddress());

                _messageCompressionSupported =
                        checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED);

                _virtualhostPropertiesSupported =
                        checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED);
                _queueLifetimeSupported =
                        checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED);
                _confirmedPublishSupported =
                        checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED);
                _confirmedPublishNonTransactionalSupported = checkConfirmedPublishNonTransactionalSupported();
                _conn.setConnectionSettings(settings);
                return null;
            }
            else
            {
                return _conn.getProtocolHandler().getSuggestedProtocolVersion();
            }
        }
        catch(QpidException | RuntimeException e)
        {
            network.close();
            throw e;
        }
        finally
        {
            // await the receiver to finish its execution (and so the IO threads too)
            if (!_conn.isConnected())
            {
                boolean closedWithinTimeout = monitoringReceiver.awaitClose(_timeout);
                if (!closedWithinTimeout)
                {
                    _logger.warn("Timed-out waiting for receiver for connection to "
                                 + brokerDetail + " to be closed.");
                }
            }
        }

    }