in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java [230:330]
public void open() throws TransportException
{
log.debug("Opening amqp layer...");
connectionId = UUID.randomUUID().toString();
this.savedException = null;
if (this.state == IotHubConnectionStatus.DISCONNECTED)
{
for (ClientConfiguration clientConfig : clientConfigurations)
{
this.addSessionHandler(clientConfig);
}
initializeStateLatches();
try
{
this.openAsync();
if (this.authenticationType == ClientConfiguration.AuthType.SAS_TOKEN)
{
// x509 authenticated connections don't open authentication links since the SSL handshake does all the authentication
log.trace("Waiting for authentication links to open...");
}
Iterator<ClientConfiguration> configsIterator = this.clientConfigurations.iterator();
ClientConfiguration defaultConfig = configsIterator.hasNext() ? configsIterator.next() : null;
int timeoutSeconds = ClientConfiguration.DEFAULT_AMQP_OPEN_AUTHENTICATION_SESSION_TIMEOUT_IN_SECONDS;
if (defaultConfig != null)
{
timeoutSeconds = defaultConfig.getAmqpOpenAuthenticationSessionTimeout();
}
boolean authenticationSessionOpenTimedOut = !this.authenticationSessionOpenedLatch.await(timeoutSeconds, TimeUnit.SECONDS);
if (this.savedException != null)
{
throw this.savedException;
}
if (authenticationSessionOpenTimedOut)
{
closeConnectionWithException("Timed out waiting for authentication session to open", true);
}
log.trace("Waiting for device sessions to open...");
boolean deviceSessionsOpenTimedOut = false;
for (ClientConfiguration config : this.clientConfigurations)
{
//Each device has its own worker session timeout according to its config settings
deviceSessionsOpenTimedOut = !this.deviceSessionsOpenedLatches.get(config.getDeviceId()).await(config.getAmqpOpenDeviceSessionsTimeout(), TimeUnit.SECONDS);
if (deviceSessionsOpenTimedOut)
{
// If any device session times out while opening, don't wait for the others
break;
}
}
if (this.savedException != null)
{
throw this.savedException;
}
if (deviceSessionsOpenTimedOut)
{
closeConnectionWithException("Timed out waiting for worker links to open", true);
}
}
catch (TransportException e)
{
// since some session handlers were created and saved locally, they must be deleted so that the next open
// call doesn't add new session handlers on top of a list of the same session handlers from the last attempt.
clearLocalState();
// clean up network resources and thread scheduler before exiting this layer. Subsequent open attempts
// will create a new reactor and a new executor service
closeNetworkResources();
throw e;
}
catch (InterruptedException e)
{
// since some session handlers were created and saved locally, they must be deleted so that the next open
// call doesn't add new session handlers on top of a list of the same session handlers from the last attempt.
clearLocalState();
// clean up network resources and thread scheduler before exiting this layer. Subsequent open attempts
// will create a new reactor and a new executor service
closeNetworkResources();
TransportException interruptedTransportException = new TransportException("Interrupted while waiting for links to open for AMQP connection", e);
interruptedTransportException.setRetryable(true);
throw interruptedTransportException;
}
}
this.state = IotHubConnectionStatus.CONNECTED;
this.listener.onConnectionEstablished(this.connectionId);
log.debug("Amqp connection opened successfully");
}