in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java [1138:1207]
private void checkForNewlyUnregisteredMultiplexedClientsToStop()
{
Iterator<ClientConfiguration> configsToUnregisterIterator = this.multiplexingClientsToUnregister.keySet().iterator();
ClientConfiguration configToUnregister = configsToUnregisterIterator.hasNext() ? configsToUnregisterIterator.next() : null;
Set<ClientConfiguration> configsUnregisteredSuccessfully = new HashSet<>();
while (configToUnregister != null)
{
String deviceId = configToUnregister.getDeviceId();
// Check if the device session still exists from a previous connection
AmqpsSessionHandler amqpsSessionHandler = this.sessionHandlers.get(deviceId);
// If a device session doesn't currently exist for this device identity
if (amqpsSessionHandler == null)
{
log.warn("Attempted to remove device session for device {} from multiplexed connection, but device was not currently registered.", deviceId);
}
else
{
log.trace("Removing session handler for device {}", deviceId);
this.sessionHandlers.remove(deviceId);
// if the client being unregistered is doing so for reconnection purposes
boolean isSessionReconnecting = this.multiplexingClientsToUnregister.get(configToUnregister);
if (isSessionReconnecting)
{
// save the session handler for later since it has state for what subscriptions the device had before this reconnection
this.reconnectingDeviceSessionHandlers.put(deviceId, amqpsSessionHandler);
}
else
{
// remove the cached session handler since the device is being unregistered manually if it is cached
this.reconnectingDeviceSessionHandlers.remove(deviceId);
}
// Need to find the sas token renewal handler that is tied to this device
AmqpsSasTokenRenewalHandler sasTokenRenewalHandlerToRemove = null;
for (AmqpsSasTokenRenewalHandler existingSasTokenRenewalHandler : this.sasTokenRenewalHandlers)
{
if (existingSasTokenRenewalHandler.amqpsSessionHandler.getDeviceId().equals(configToUnregister.getDeviceId()))
{
sasTokenRenewalHandlerToRemove = existingSasTokenRenewalHandler;
// Stop the sas token renewal handler from sending any more authentication messages on behalf of this device
log.trace("Closing sas token renewal handler for device {}", configToUnregister.getDeviceId());
sasTokenRenewalHandlerToRemove.close();
break;
}
}
if (sasTokenRenewalHandlerToRemove != null)
{
this.sasTokenRenewalHandlers.remove(sasTokenRenewalHandlerToRemove);
}
log.debug("Closing device session for multiplexed device {}", configToUnregister.getDeviceId());
amqpsSessionHandler.closeSession();
}
configsUnregisteredSuccessfully.add(configToUnregister);
configToUnregister = configsToUnregisterIterator.hasNext() ? configsToUnregisterIterator.next() : null;
}
for (ClientConfiguration successfullyUnregisteredConfig : configsUnregisteredSuccessfully)
{
this.multiplexingClientsToUnregister.remove(successfullyUnregisteredConfig);
}
this.clientConfigurations.removeAll(configsUnregisteredSuccessfully);
}