AmqpsSendResult sendMessageAndGetDeliveryTag()

in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsSenderLinkHandler.java [195:257]


    AmqpsSendResult sendMessageAndGetDeliveryTag(MessageImpl protonMessage)
    {
        //Callers of this method are responsible for putting the returned delivery tag into the inProgressMessages map
        // so that this link can respond to this message being acknowledged appropriately

        //want to avoid negative delivery tags since -1 is the designated failure value
        if (this.nextTag == Integer.MAX_VALUE || this.nextTag < 0)
        {
            this.nextTag = 0;
        }
        else
        {
            this.nextTag++;
        }

        byte[] msgData = new byte[1024];
        int length;

        while (true)
        {
            try
            {
                length = protonMessage.encode(msgData, 0, msgData.length);
                break;
            }
            catch (BufferOverflowException e)
            {
                msgData = new byte[msgData.length * 2];
            }
        }

        byte[] deliveryTag = String.valueOf(this.nextTag).getBytes(StandardCharsets.UTF_8);

        Delivery delivery = this.senderLink.delivery(deliveryTag);
        try
        {
            log.trace("Sending {} bytes over the amqp {} sender link with address {} and link correlation id {} with link credit {}", length, getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, this.senderLink.getCredit());
            int bytesSent = this.senderLink.send(msgData, 0, length);

            if (bytesSent != length)
            {
                throw new ProtocolException(String.format("Amqp send operation did not send all of the expected bytes for %s sender link with link correlation id %s, retrying to send the message", getLinkInstanceType(), this.linkCorrelationId));
            }

            boolean canAdvance = this.senderLink.advance();

            if (!canAdvance)
            {
                throw new ProtocolException(String.format("Failed to advance the senderLink after sending a message on %s sender link with link correlation id %s, retrying to send the message", getLinkInstanceType(), this.linkCorrelationId));
            }

            log.trace("Message was sent over {} sender link with address {} and link correlation id {} with delivery tag {}", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, new String(deliveryTag, StandardCharsets.UTF_8));
            log.trace("Current link credit on {} sender link with address {} and link correlation id {} is {}", this.getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, senderLink.getCredit());
            return new AmqpsSendResult(deliveryTag);
        }
        catch (Exception e)
        {
            log.warn("Encountered a problem while sending a message on {} sender link with address {} and link correlation id {}", getLinkInstanceType(), this.senderLinkAddress, this.linkCorrelationId, e);
            this.senderLink.advance();
            delivery.free();
            return new AmqpsSendResult();
        }
    }