public void doSend()

in broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java [180:333]


    public void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, boolean batch)
    {
        ServerMessage serverMsg = entry.getMessage();


        MessageTransfer xfr;

        DeliveryProperties deliveryProps;
        MessageProperties messageProps = null;

        MessageTransferMessage msg;
        MessageConverter<? super ServerMessage, MessageTransferMessage> converter = null;

        if(serverMsg instanceof MessageTransferMessage)
        {

            msg = (MessageTransferMessage) serverMsg;

        }
        else
        {
            if (!serverMsg.checkValid())
            {
                throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMsg));
            }
            converter = (MessageConverter<? super ServerMessage, MessageTransferMessage>) MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
            msg = converter.convert(serverMsg, _session.getAddressSpace());
        }

        DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
        messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();

        deliveryProps = new DeliveryProperties();
        if(origDeliveryProps != null)
        {
            if(origDeliveryProps.hasDeliveryMode())
            {
                deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
            }
            if(origDeliveryProps.hasExchange())
            {
                deliveryProps.setExchange(origDeliveryProps.getExchange());
            }
            if(origDeliveryProps.hasExpiration())
            {
                deliveryProps.setExpiration(origDeliveryProps.getExpiration());
            }
            if(origDeliveryProps.hasPriority())
            {
                deliveryProps.setPriority(origDeliveryProps.getPriority());
            }
            if(origDeliveryProps.hasRoutingKey())
            {
                deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
            }
            if(origDeliveryProps.hasTimestamp())
            {
                deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
            }
            if(origDeliveryProps.hasTtl())
            {
                deliveryProps.setTtl(origDeliveryProps.getTtl());
            }


        }

        deliveryProps.setRedelivered(entry.isRedelivered());

        boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding());


        QpidByteBuffer bodyBuffer = msg.getBody();

        boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported();

        if(msgCompressed && !compressionSupported && bodyBuffer != null)
        {
            QpidByteBuffer uncompressedBuffer = inflateIfPossible(bodyBuffer);
            messageProps.setContentEncoding(null);
            bodyBuffer.dispose();
            bodyBuffer = uncompressedBuffer;
        }
        else if(!msgCompressed
                && compressionSupported
                && (messageProps == null || messageProps.getContentEncoding() == null)
                && bodyBuffer != null
                && bodyBuffer.remaining() > _session.getConnection().getMessageCompressionThreshold())
        {
            QpidByteBuffer compressedBuffers = deflateIfPossible(bodyBuffer);
            if(messageProps == null)
            {
                messageProps = new MessageProperties();
            }
            messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
            bodyBuffer.dispose();
            bodyBuffer = compressedBuffers;
        }

        Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());

        xfr = batch ? new MessageTransfer(_name, _acceptMode, _acquireMode, header, bodyBuffer, BATCHED)
                    : new MessageTransfer(_name, _acceptMode, _acquireMode, header, bodyBuffer);
        if (bodyBuffer != null)
        {
            bodyBuffer.dispose();
            bodyBuffer = null;
        }
        if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
        {
            xfr.setCompletionListener(new MessageAcceptCompletionListener(this, consumer, _session, entry, _flowMode == MessageFlowMode.WINDOW));
        }
        else if(_flowMode == MessageFlowMode.WINDOW)
        {
            final long messageSize = entry.getMessage().getSize();
            xfr.setCompletionListener(method -> deferredAddCredit(1, messageSize));
        }


        _postIdSettingAction.setXfr(xfr);
        _postIdSettingAction.setAction(null);

        if (_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
        {
            entry.incrementDeliveryCount();
        }

        if(_acceptMode == MessageAcceptMode.EXPLICIT)
        {
            _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer));
        }
        else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
        {
            _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer));
        }

        _session.sendMessage(xfr, _postIdSettingAction);
        xfr.dispose();
        if(converter != null)
        {
            converter.dispose(msg);
        }
        _postIdSettingAction.setAction(null);
        _postIdSettingAction.setXfr(null);

        if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
        {
            forceDequeue(entry, false);
        }
        else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
        {
            addUnacknowledgedMessage(entry);
        }
    }