in client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java [113:206]
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws QpidException
{
if (_logger.isDebugEnabled())
{
_logger.debug("Connecting to broker:" + brokerDetail);
}
final Set<AMQState> openOrClosedStates =
EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
ConnectionSettings settings = brokerDetail.buildConnectionSettings();
//Check connection-level ssl override setting
String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL);
if(connectionSslOption != null)
{
boolean connUseSsl = Boolean.parseBoolean(connectionSslOption);
boolean brokerlistUseSsl = settings.isUseSSL();
if( connUseSsl != brokerlistUseSsl)
{
settings.setUseSSL(connUseSsl);
if (_logger.isDebugEnabled())
{
_logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl );
}
}
}
SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);
IoNetworkTransport transport = new IoNetworkTransport();
ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler()));
NetworkConnection network = transport.connect(settings, monitoringReceiver,
_conn.getProtocolHandler());
try
{
_conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));
StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
_conn.getProtocolHandler().init(settings);
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
AMQState state = waiter.await();
if (state == AMQState.CONNECTION_OPEN)
{
_conn.getFailoverPolicy().attainedConnection();
_conn.setConnected(true);
_conn.logConnected(network.getLocalAddress(), network.getRemoteAddress());
_messageCompressionSupported =
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED);
_virtualhostPropertiesSupported =
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED);
_queueLifetimeSupported =
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED);
_confirmedPublishSupported =
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED);
_confirmedPublishNonTransactionalSupported = checkConfirmedPublishNonTransactionalSupported();
_conn.setConnectionSettings(settings);
return null;
}
else
{
return _conn.getProtocolHandler().getSuggestedProtocolVersion();
}
}
catch(QpidException | RuntimeException e)
{
network.close();
throw e;
}
finally
{
// await the receiver to finish its execution (and so the IO threads too)
if (!_conn.isConnected())
{
boolean closedWithinTimeout = monitoringReceiver.awaitClose(_timeout);
if (!closedWithinTimeout)
{
_logger.warn("Timed-out waiting for receiver for connection to "
+ brokerDetail + " to be closed.");
}
}
}
}