private void acknowledgeReceivedMessage()

in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java [1233:1291]


    private void acknowledgeReceivedMessage(IotHubTransportMessage receivedMessage) throws TransportException
    {
        MessageCallback messageCallback = receivedMessage.getMessageCallback();
        Object messageCallbackContext = receivedMessage.getMessageCallbackContext();

        if (messageCallback != null)
        {
            // If a message callback throws an exception here the acknowledge will never be sent and this message will
            // live in Iot hub until it expires.
            IotHubMessageResult result;
            try
            {
                log.debug("Executing callback for received message ({})", receivedMessage);
                result = messageCallback.onCloudToDeviceMessageReceived(receivedMessage, messageCallbackContext);
            }
            catch (Throwable ex)
            {
                // We want to log this exception and bubble up to the transport
                log.warn("Exception thrown while calling the message callback for received message {} in acknowledgeReceivedMessage. " +
                        "This exception is preventing the completion of message delivery and can result in messages being" +
                        "stuck in IoT hub until they expire. This can prevent the client from receiving futher messages.", receivedMessage, ex);
                throw ex;
            }

            try
            {
                log.debug("Sending acknowledgement for received cloud to device message ({})", receivedMessage);
                this.iotHubTransportConnection.sendMessageResult(receivedMessage, result);

                try
                {
                    String correlationId = receivedMessage.getCorrelationId();
                    if (!correlationId.isEmpty())
                    {
                        CorrelationCallbackContext callbackContext = correlationCallbacks.get(correlationId);

                        if (callbackContext != null && callbackContext.getCallback() != null)
                        {
                            callbackContext.getCallback().onResponseAcknowledged(receivedMessage, callbackContext.getUserContext());
                        }

                        // We need to remove the CorrelatingMessageCallback with the current correlation ID from the map after the received C2D
                        // message has been acknowledged. Otherwise, the size of map will grow endlessly which results in OutOfMemory eventually.
                        new Thread(() -> correlationCallbacks.remove(correlationId)).start();
                    }
                }
                catch (Exception ex)
                {
                    log.warn("Exception thrown while calling the onResponseAcknowledged callback in acknowledgeReceivedMessage", ex);
                }
            }
            catch (TransportException e)
            {
                log.warn("Sending acknowledgement for received cloud to device message failed, adding it back to the queue ({})", receivedMessage, e);
                this.addToReceivedMessagesQueue(receivedMessage);
                throw e;
            }
        }
    }