in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java [764:812]
public void sendMessages()
{
checkForExpiredMessages();
if (this.connectionStatus == IotHubConnectionStatus.DISCONNECTED
|| this.connectionStatus == IotHubConnectionStatus.DISCONNECTED_RETRYING)
{
return;
}
int timeSlice = maxNumberOfMessagesToSendPerThread;
synchronized (this.waitingPacketsLock)
{
while (this.connectionStatus == IotHubConnectionStatus.CONNECTED && timeSlice-- > 0)
{
IotHubTransportPacket packet = waitingPacketsQueue.poll();
if (packet != null)
{
Message message = packet.getMessage();
log.trace("Dequeued a message from waiting queue to be sent ({})", message);
if (message != null && this.isMessageValid(packet))
{
sendPacket(packet);
try
{
String correlationId = message.getCorrelationId();
if (!correlationId.isEmpty())
{
CorrelationCallbackContext callbackContext = correlationCallbacks.get(correlationId);
if (callbackContext != null && callbackContext.getCallback() != null)
{
callbackContext.getCallback().onRequestSent(message, callbackContext.getUserContext());
}
}
}
catch (Exception e)
{
log.warn("Exception thrown while calling the onRequestSent callback in sendMessages", e);
}
}
}
}
}
}