in broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java [180:333]
public void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, boolean batch)
{
ServerMessage serverMsg = entry.getMessage();
MessageTransfer xfr;
DeliveryProperties deliveryProps;
MessageProperties messageProps = null;
MessageTransferMessage msg;
MessageConverter<? super ServerMessage, MessageTransferMessage> converter = null;
if(serverMsg instanceof MessageTransferMessage)
{
msg = (MessageTransferMessage) serverMsg;
}
else
{
if (!serverMsg.checkValid())
{
throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMsg));
}
converter = (MessageConverter<? super ServerMessage, MessageTransferMessage>) MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
msg = converter.convert(serverMsg, _session.getAddressSpace());
}
DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
deliveryProps = new DeliveryProperties();
if(origDeliveryProps != null)
{
if(origDeliveryProps.hasDeliveryMode())
{
deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
}
if(origDeliveryProps.hasExchange())
{
deliveryProps.setExchange(origDeliveryProps.getExchange());
}
if(origDeliveryProps.hasExpiration())
{
deliveryProps.setExpiration(origDeliveryProps.getExpiration());
}
if(origDeliveryProps.hasPriority())
{
deliveryProps.setPriority(origDeliveryProps.getPriority());
}
if(origDeliveryProps.hasRoutingKey())
{
deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
}
if(origDeliveryProps.hasTimestamp())
{
deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
}
if(origDeliveryProps.hasTtl())
{
deliveryProps.setTtl(origDeliveryProps.getTtl());
}
}
deliveryProps.setRedelivered(entry.isRedelivered());
boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding());
QpidByteBuffer bodyBuffer = msg.getBody();
boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported();
if(msgCompressed && !compressionSupported && bodyBuffer != null)
{
QpidByteBuffer uncompressedBuffer = inflateIfPossible(bodyBuffer);
messageProps.setContentEncoding(null);
bodyBuffer.dispose();
bodyBuffer = uncompressedBuffer;
}
else if(!msgCompressed
&& compressionSupported
&& (messageProps == null || messageProps.getContentEncoding() == null)
&& bodyBuffer != null
&& bodyBuffer.remaining() > _session.getConnection().getMessageCompressionThreshold())
{
QpidByteBuffer compressedBuffers = deflateIfPossible(bodyBuffer);
if(messageProps == null)
{
messageProps = new MessageProperties();
}
messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
bodyBuffer.dispose();
bodyBuffer = compressedBuffers;
}
Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
xfr = batch ? new MessageTransfer(_name, _acceptMode, _acquireMode, header, bodyBuffer, BATCHED)
: new MessageTransfer(_name, _acceptMode, _acquireMode, header, bodyBuffer);
if (bodyBuffer != null)
{
bodyBuffer.dispose();
bodyBuffer = null;
}
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
xfr.setCompletionListener(new MessageAcceptCompletionListener(this, consumer, _session, entry, _flowMode == MessageFlowMode.WINDOW));
}
else if(_flowMode == MessageFlowMode.WINDOW)
{
final long messageSize = entry.getMessage().getSize();
xfr.setCompletionListener(method -> deferredAddCredit(1, messageSize));
}
_postIdSettingAction.setXfr(xfr);
_postIdSettingAction.setAction(null);
if (_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
{
entry.incrementDeliveryCount();
}
if(_acceptMode == MessageAcceptMode.EXPLICIT)
{
_postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer));
}
else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
_postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer));
}
_session.sendMessage(xfr, _postIdSettingAction);
xfr.dispose();
if(converter != null)
{
converter.dispose(msg);
}
_postIdSettingAction.setAction(null);
_postIdSettingAction.setXfr(null);
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
{
forceDequeue(entry, false);
}
else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
{
addUnacknowledgedMessage(entry);
}
}