in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java [1233:1291]
private void acknowledgeReceivedMessage(IotHubTransportMessage receivedMessage) throws TransportException
{
MessageCallback messageCallback = receivedMessage.getMessageCallback();
Object messageCallbackContext = receivedMessage.getMessageCallbackContext();
if (messageCallback != null)
{
// If a message callback throws an exception here the acknowledge will never be sent and this message will
// live in Iot hub until it expires.
IotHubMessageResult result;
try
{
log.debug("Executing callback for received message ({})", receivedMessage);
result = messageCallback.onCloudToDeviceMessageReceived(receivedMessage, messageCallbackContext);
}
catch (Throwable ex)
{
// We want to log this exception and bubble up to the transport
log.warn("Exception thrown while calling the message callback for received message {} in acknowledgeReceivedMessage. " +
"This exception is preventing the completion of message delivery and can result in messages being" +
"stuck in IoT hub until they expire. This can prevent the client from receiving futher messages.", receivedMessage, ex);
throw ex;
}
try
{
log.debug("Sending acknowledgement for received cloud to device message ({})", receivedMessage);
this.iotHubTransportConnection.sendMessageResult(receivedMessage, result);
try
{
String correlationId = receivedMessage.getCorrelationId();
if (!correlationId.isEmpty())
{
CorrelationCallbackContext callbackContext = correlationCallbacks.get(correlationId);
if (callbackContext != null && callbackContext.getCallback() != null)
{
callbackContext.getCallback().onResponseAcknowledged(receivedMessage, callbackContext.getUserContext());
}
// We need to remove the CorrelatingMessageCallback with the current correlation ID from the map after the received C2D
// message has been acknowledged. Otherwise, the size of map will grow endlessly which results in OutOfMemory eventually.
new Thread(() -> correlationCallbacks.remove(correlationId)).start();
}
}
catch (Exception ex)
{
log.warn("Exception thrown while calling the onResponseAcknowledged callback in acknowledgeReceivedMessage", ex);
}
}
catch (TransportException e)
{
log.warn("Sending acknowledgement for received cloud to device message failed, adding it back to the queue ({})", receivedMessage, e);
this.addToReceivedMessagesQueue(receivedMessage);
throw e;
}
}
}