in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/mqtt/Mqtt.java [187:241]
void publish(String publishTopic, Message message) throws TransportException
{
try
{
if (!this.mqttAsyncClient.isConnected())
{
TransportException transportException = new TransportException("Cannot publish when mqtt client is disconnected");
transportException.setRetryable(true);
throw transportException;
}
if (message == null || publishTopic == null || publishTopic.length() == 0 || message.getBytes() == null)
{
throw new IllegalArgumentException("Cannot publish on null or empty publish topic");
}
byte[] payload = message.getBytes();
// Wait until either the number of in flight messages is below the limit before publishing another message
// Or wait until the connection is lost so the message can be requeued for later
while (this.mqttAsyncClient.getPendingDeliveryTokens().length >= MAX_IN_FLIGHT_COUNT)
{
//noinspection BusyWait
Thread.sleep(10);
if (!this.mqttAsyncClient.isConnected())
{
TransportException transportException = new TransportException("Cannot publish when mqtt client is holding " + MAX_IN_FLIGHT_COUNT + " tokens and is disconnected");
transportException.setRetryable(true);
throw transportException;
}
}
MqttMessage mqttMessage = (payload.length == 0) ? new MqttMessage() : new MqttMessage(payload);
mqttMessage.setQos(QOS);
synchronized (this.unacknowledgedSentMessagesLock)
{
log.trace("Publishing message ({}) to MQTT topic {}", message, publishTopic);
IMqttDeliveryToken publishToken = this.mqttAsyncClient.publish(publishTopic, mqttMessage);
unacknowledgedSentMessages.put(publishToken.getMessageId(), message);
log.trace("Message published to MQTT topic {}. Mqtt message id {} added to list of messages to wait for acknowledgement ({})", publishTopic, publishToken.getMessageId(), message);
}
}
catch (MqttException e)
{
log.warn("Message could not be published to MQTT topic {} ({})", publishTopic, message, e);
throw PahoExceptionTranslator.convertToMqttException(e, "Unable to publish message on topic : " + publishTopic);
}
catch (InterruptedException e)
{
throw new TransportException("Interrupted, Unable to publish message on topic : " + publishTopic, e);
}
}