void publish()

in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/mqtt/Mqtt.java [187:241]


    void publish(String publishTopic, Message message) throws TransportException
    {
        try
        {
            if (!this.mqttAsyncClient.isConnected())
            {
                TransportException transportException = new TransportException("Cannot publish when mqtt client is disconnected");
                transportException.setRetryable(true);
                throw transportException;
            }

            if (message == null || publishTopic == null || publishTopic.length() == 0 || message.getBytes() == null)
            {
                throw new IllegalArgumentException("Cannot publish on null or empty publish topic");
            }

            byte[] payload = message.getBytes();

            // Wait until either the number of in flight messages is below the limit before publishing another message
            // Or wait until the connection is lost so the message can be requeued for later
            while (this.mqttAsyncClient.getPendingDeliveryTokens().length >= MAX_IN_FLIGHT_COUNT)
            {
                //noinspection BusyWait
                Thread.sleep(10);

                if (!this.mqttAsyncClient.isConnected())
                {
                    TransportException transportException = new TransportException("Cannot publish when mqtt client is holding " + MAX_IN_FLIGHT_COUNT + " tokens and is disconnected");
                    transportException.setRetryable(true);
                    throw transportException;
                }
            }

            MqttMessage mqttMessage = (payload.length == 0) ? new MqttMessage() : new MqttMessage(payload);

            mqttMessage.setQos(QOS);

            synchronized (this.unacknowledgedSentMessagesLock)
            {
                log.trace("Publishing message ({}) to MQTT topic {}", message, publishTopic);
                IMqttDeliveryToken publishToken = this.mqttAsyncClient.publish(publishTopic, mqttMessage);
                unacknowledgedSentMessages.put(publishToken.getMessageId(), message);
                log.trace("Message published to MQTT topic {}. Mqtt message id {} added to list of messages to wait for acknowledgement ({})", publishTopic, publishToken.getMessageId(), message);
            }
        }
        catch (MqttException e)
        {
            log.warn("Message could not be published to MQTT topic {} ({})", publishTopic, message, e);
            throw PahoExceptionTranslator.convertToMqttException(e, "Unable to publish message on topic : " + publishTopic);
        }
        catch (InterruptedException e)
        {
            throw new TransportException("Interrupted, Unable to publish message on topic : " + publishTopic, e);
        }
    }