in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClient.java [413:515]
public void registerDeviceClients(Iterable<DeviceClient> deviceClients, long timeoutMilliseconds) throws InterruptedException, IotHubClientException
{
Objects.requireNonNull(deviceClients);
if (timeoutMilliseconds <= 0)
{
throw new IllegalArgumentException("Cannot set a device registration timeout of less than or equal to 0 milliseconds");
}
synchronized (this.operationLock)
{
List<ClientConfiguration> clientConfigsToRegister = new ArrayList<>();
Map<String, DeviceClient> devicesToRegisterMap = new HashMap<>();
for (DeviceClient deviceClientToRegister : deviceClients)
{
devicesToRegisterMap.put(deviceClientToRegister.getConfig().getDeviceId(), deviceClientToRegister);
ClientConfiguration configToAdd = deviceClientToRegister.getConfig();
// Overwrite the proxy settings of the new client to match the multiplexing client settings
configToAdd.setProxySettings(this.proxySettings);
if (configToAdd.getAuthenticationType() != ClientConfiguration.AuthType.SAS_TOKEN)
{
throw new UnsupportedOperationException("Can only register to multiplex a device client that uses SAS token based authentication");
}
if (configToAdd.getProtocol() != this.protocol)
{
throw new UnsupportedOperationException("A device client cannot be registered to a multiplexing client that specifies a different transport protocol.");
}
if (this.protocol == IotHubClientProtocol.AMQPS && this.multiplexedDeviceClients.size() > MAX_MULTIPLEX_DEVICE_COUNT_AMQPS)
{
throw new UnsupportedOperationException(String.format("Multiplexed connections over AMQPS only support up to %d devices", MAX_MULTIPLEX_DEVICE_COUNT_AMQPS));
}
// Typically client side validation is duplicate work, but IoT hub doesn't give a good error message when closing the
// AMQPS_WS connection so this is the only way that users will know about this limit
if (this.protocol == IotHubClientProtocol.AMQPS_WS && this.multiplexedDeviceClients.size() > MAX_MULTIPLEX_DEVICE_COUNT_AMQPS_WS)
{
throw new UnsupportedOperationException(String.format("Multiplexed connections over AMQPS_WS only support up to %d devices", MAX_MULTIPLEX_DEVICE_COUNT_AMQPS_WS));
}
if (!this.hostName.equalsIgnoreCase(configToAdd.getIotHubHostname()))
{
throw new UnsupportedOperationException("A device client cannot be registered to a multiplexing client that specifies a different host name.");
}
if (deviceClientToRegister.getDeviceIO() != null && deviceClientToRegister.getDeviceIO().isOpen() && !deviceClientToRegister.isMultiplexed)
{
throw new UnsupportedOperationException("Cannot register a device client to a multiplexed connection when the device client was already opened.");
}
deviceClientToRegister.setAsMultiplexed();
deviceClientToRegister.setDeviceIO(this.deviceIO);
deviceClientToRegister.markAsMultiplexed();
// Set notifies us if the device client is already in the set
boolean deviceAlreadyRegistered = this.multiplexedDeviceClients.containsKey(deviceClientToRegister.getConfig().getDeviceId());
if (deviceAlreadyRegistered)
{
log.debug("Device {} wasn't registered to the multiplexed connection because it is already registered.", configToAdd.getDeviceId());
}
else
{
clientConfigsToRegister.add(configToAdd);
}
}
// if the device IO hasn't been created yet, then this client will be registered once it is created.
for (ClientConfiguration configBeingRegistered : clientConfigsToRegister)
{
log.info("Registering device {} to multiplexing client", configBeingRegistered.getDeviceId());
}
try
{
this.deviceIO.registerMultiplexedDeviceClient(clientConfigsToRegister, timeoutMilliseconds);
// Only update the local state map once the register call has succeeded
this.multiplexedDeviceClients.putAll(devicesToRegisterMap);
}
catch (MultiplexingClientRegistrationException e)
{
// If registration failed, 1 or more clients should not be considered registered in this layer's state.
// Save the exception so it can be rethrown once the local state has been updated to match the actual state
// of the multiplexed connection.
for (DeviceClient clientsThatAttemptedToRegister : deviceClients)
{
// Only update the local state map once the register call has succeeded
String deviceIdThatAttemptedToRegister = clientsThatAttemptedToRegister.getConfig().getDeviceId();
if (!e.getRegistrationExceptions().containsKey(deviceIdThatAttemptedToRegister))
{
this.multiplexedDeviceClients.put(deviceIdThatAttemptedToRegister, clientsThatAttemptedToRegister);
}
}
throw e;
}
}
}