in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java [1111:1186]
public void unregisterMultiplexedDeviceClient(List<ClientConfiguration> configs, long timeoutMilliseconds) throws InterruptedException, IotHubClientException
{
if (getProtocol() != IotHubClientProtocol.AMQPS && getProtocol() != IotHubClientProtocol.AMQPS_WS)
{
throw new UnsupportedOperationException("Cannot add a multiplexed device unless connection is over AMQPS or AMQPS_WS.");
}
for (ClientConfiguration configToRegister : configs)
{
if (this.iotHubTransportConnection != null)
{
// Safe cast since amqps and amqps_ws always use this transport connection type.
((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(configToRegister, false);
}
else
{
this.multiplexedDeviceConnectionStates.remove(configToRegister.getDeviceId());
}
this.deviceClientConfigs.remove(configToRegister.getDeviceId());
}
// If the multiplexed connection is active, block until all the unregistered devices have been disconnected.
long timeoutTime = System.currentTimeMillis() + timeoutMilliseconds;
if (this.connectionStatus != IotHubConnectionStatus.DISCONNECTED)
{
for (ClientConfiguration newlyUnregisteredConfig : configs)
{
while (multiplexedDeviceConnectionStates.get(newlyUnregisteredConfig.getDeviceId()).getConnectionStatus() != IotHubConnectionStatus.DISCONNECTED)
{
//noinspection BusyWait
Thread.sleep(100);
boolean operationHasTimedOut = System.currentTimeMillis() >= timeoutTime;
if (operationHasTimedOut)
{
throw new IotHubClientException(DEVICE_OPERATION_TIMED_OUT, "Timed out waiting for all device unregistrations to finish.");
}
}
this.multiplexedDeviceConnectionStates.remove(newlyUnregisteredConfig.getDeviceId());
}
}
// When a client is unregistered, remove all "waiting" and "in progress" messages that it had queued.
for (IotHubTransportPacket waitingPacket : this.waitingPacketsQueue)
{
String deviceIdForMessage = waitingPacket.getDeviceId();
for (ClientConfiguration unregisteredConfig : configs)
{
if (unregisteredConfig.getDeviceId().equals(deviceIdForMessage))
{
this.waitingPacketsQueue.remove(waitingPacket);
waitingPacket.setStatus(IotHubStatusCode.MESSAGE_CANCELLED_ONCLOSE);
this.addToCallbackQueue(waitingPacket);
}
}
}
synchronized (this.inProgressMessagesLock)
{
for (String messageId : this.inProgressPackets.keySet())
{
String deviceIdForMessage = this.inProgressPackets.get(messageId).getDeviceId();
for (ClientConfiguration unregisteredConfig : configs)
{
if (unregisteredConfig.getDeviceId().equals(deviceIdForMessage))
{
IotHubTransportPacket cancelledPacket = this.inProgressPackets.remove(messageId);
cancelledPacket.setStatus(IotHubStatusCode.MESSAGE_CANCELLED_ONCLOSE);
this.addToCallbackQueue(cancelledPacket);
}
}
}
}
}