in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsSenderLinkHandler.java [195:257]
AmqpsSendResult sendMessageAndGetDeliveryTag(MessageImpl protonMessage)
{
//Callers of this method are responsible for putting the returned delivery tag into the inProgressMessages map
// so that this link can respond to this message being acknowledged appropriately
//want to avoid negative delivery tags since -1 is the designated failure value
if (this.nextTag == Integer.MAX_VALUE || this.nextTag < 0)
{
this.nextTag = 0;
}
else
{
this.nextTag++;
}
byte[] msgData = new byte[1024];
int length;
while (true)
{
try
{
length = protonMessage.encode(msgData, 0, msgData.length);
break;
}
catch (BufferOverflowException e)
{
msgData = new byte[msgData.length * 2];
}
}
byte[] deliveryTag = String.valueOf(this.nextTag).getBytes(StandardCharsets.UTF_8);
Delivery delivery = this.senderLink.delivery(deliveryTag);
try
{
log.trace("Sending {} bytes over the amqp {} sender link with address {} and link correlation id {} with link credit {}", length, getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, this.senderLink.getCredit());
int bytesSent = this.senderLink.send(msgData, 0, length);
if (bytesSent != length)
{
throw new ProtocolException(String.format("Amqp send operation did not send all of the expected bytes for %s sender link with link correlation id %s, retrying to send the message", getLinkInstanceType(), this.linkCorrelationId));
}
boolean canAdvance = this.senderLink.advance();
if (!canAdvance)
{
throw new ProtocolException(String.format("Failed to advance the senderLink after sending a message on %s sender link with link correlation id %s, retrying to send the message", getLinkInstanceType(), this.linkCorrelationId));
}
log.trace("Message was sent over {} sender link with address {} and link correlation id {} with delivery tag {}", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, new String(deliveryTag, StandardCharsets.UTF_8));
log.trace("Current link credit on {} sender link with address {} and link correlation id {} is {}", this.getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, senderLink.getCredit());
return new AmqpsSendResult(deliveryTag);
}
catch (Exception e)
{
log.warn("Encountered a problem while sending a message on {} sender link with address {} and link correlation id {}", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, e);
this.senderLink.advance();
delivery.free();
return new AmqpsSendResult();
}
}