public void doSend()

in broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java [131:333]


    public void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, boolean batch)
    {
        ServerMessage serverMessage = entry.getMessage();
        Message_1_0 message;
        final MessageConverter<? super ServerMessage, Message_1_0> converter;
        if(serverMessage instanceof Message_1_0)
        {
            converter = null;
            message = (Message_1_0) serverMessage;
        }
        else
        {
            if (!serverMessage.checkValid())
            {
                throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMessage));
            }
            converter =
                    (MessageConverter<? super ServerMessage, Message_1_0>) MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
            if (converter == null)
            {
                throw new ServerScopedRuntimeException(String.format(
                        "Could not find message converter from '%s' to '%s'."
                        + " This is unexpected since we should not try to send if the converter is not present.",
                        serverMessage.getClass(),
                        Message_1_0.class));
            }
            message = converter.convert(serverMessage, _linkEndpoint.getAddressSpace());
        }

        Transfer transfer = new Transfer();
        try
        {
            QpidByteBuffer bodyContent = message.getContent();
            HeaderSection headerSection = message.getHeaderSection();

            UnsignedInteger ttl = headerSection == null ? null : headerSection.getValue().getTtl();
            if (entry.getDeliveryCount() != 0 || ttl != null)
            {
                Header header = new Header();
                if (headerSection != null)
                {
                    final Header oldHeader = headerSection.getValue();
                    header.setDurable(oldHeader.getDurable());
                    header.setPriority(oldHeader.getPriority());

                    if (ttl != null)
                    {
                        long timeSpentOnBroker = System.currentTimeMillis() - message.getArrivalTime();
                        final long adjustedTtl = Math.max(0L, ttl.longValue() - timeSpentOnBroker);
                        header.setTtl(UnsignedInteger.valueOf(adjustedTtl));
                    }
                    headerSection.dispose();
                }

                if (entry.getDeliveryCount() != 0)
                {
                    header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
                }

                headerSection = header.createEncodingRetainingSection();
            }
            List<QpidByteBuffer> payload = new ArrayList<>();
            if(headerSection != null)
            {
                payload.add(headerSection.getEncodedForm());
                headerSection.dispose();
            }
            EncodingRetainingSection<?> section;
            if((section = message.getDeliveryAnnotationsSection()) != null)
            {
                payload.add(section.getEncodedForm());
                section.dispose();
            }

            if((section = message.getMessageAnnotationsSection()) != null)
            {
                payload.add(section.getEncodedForm());
                section.dispose();
            }

            if((section = message.getPropertiesSection()) != null)
            {
                payload.add(section.getEncodedForm());
                section.dispose();
            }

            if((section = message.getApplicationPropertiesSection()) != null)
            {
                payload.add(section.getEncodedForm());
                section.dispose();
            }

            payload.add(bodyContent);

            if((section = message.getFooterSection()) != null)
            {
                payload.add(section.getEncodedForm());
                section.dispose();
            }

            try (QpidByteBuffer combined = QpidByteBuffer.concatenate(payload))
            {
                transfer.setPayload(combined);
            }

            payload.forEach(QpidByteBuffer::dispose);

            byte[] data = new byte[8];
            ByteBuffer.wrap(data).putLong(_deliveryTag++);
            final Binary tag = new Binary(data);

            transfer.setDeliveryTag(tag);

            if (_linkEndpoint.isAttached())
            {
                boolean sendPreSettled = SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode());
                if (sendPreSettled)
                {
                    transfer.setSettled(true);
                    if (_acquires && _transactionId == null)
                    {
                        transfer.setState(new Accepted());
                    }
                }
                else
                {
                    final UnsettledAction action;
                    if (_acquires)
                    {
                        action = new DispositionAction(tag, entry, consumer);
                        addUnacknowledgedMessage(entry);
                    }
                    else
                    {
                        action = new DoNothingAction();
                    }

                    _linkEndpoint.addUnsettled(tag, action, entry);
                }

                if (_transactionId != null)
                {
                    TransactionalState state = new TransactionalState();
                    state.setTxnId(_transactionId);
                    transfer.setState(state);
                }
                if (_acquires && _transactionId != null)
                {
                    try
                    {
                        ServerTransaction txn = _linkEndpoint.getTransaction(_transactionId);

                        txn.addPostTransactionAction(new ServerTransaction.Action()
                        {
                            @Override
                            public void postCommit()
                            {
                            }

                            @Override
                            public void onRollback()
                            {
                                entry.release(consumer);
                                _linkEndpoint.updateDisposition(tag, null, true);
                            }
                        });
                        final TransactionLogResource owningResource = entry.getOwningResource();
                        if (owningResource instanceof TransactionMonitor)
                        {
                            ((TransactionMonitor) owningResource).registerTransaction(txn);
                        }
                    }
                    catch (UnknownTransactionException e)
                    {
                        entry.release(consumer);
                        getEndpoint().close(new Error(TransactionError.UNKNOWN_ID, e.getMessage()));
                        return;
                    }

                }
                getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
                getEndpoint().transfer(transfer, false);

                if (sendPreSettled && _acquires && _transactionId == null)
                {
                    handleAcquiredEntrySentPareSettledNonTransactional(entry, consumer);
                }
            }
            else
            {
                entry.release(consumer);
            }

        }
        finally
        {
            transfer.dispose();
            if(converter != null)
            {
                converter.dispose(message);
            }
        }
    }