in broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java [131:333]
public void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, boolean batch)
{
ServerMessage serverMessage = entry.getMessage();
Message_1_0 message;
final MessageConverter<? super ServerMessage, Message_1_0> converter;
if(serverMessage instanceof Message_1_0)
{
converter = null;
message = (Message_1_0) serverMessage;
}
else
{
if (!serverMessage.checkValid())
{
throw new MessageConversionException(String.format("Cannot convert malformed message '%s'", serverMessage));
}
converter =
(MessageConverter<? super ServerMessage, Message_1_0>) MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
if (converter == null)
{
throw new ServerScopedRuntimeException(String.format(
"Could not find message converter from '%s' to '%s'."
+ " This is unexpected since we should not try to send if the converter is not present.",
serverMessage.getClass(),
Message_1_0.class));
}
message = converter.convert(serverMessage, _linkEndpoint.getAddressSpace());
}
Transfer transfer = new Transfer();
try
{
QpidByteBuffer bodyContent = message.getContent();
HeaderSection headerSection = message.getHeaderSection();
UnsignedInteger ttl = headerSection == null ? null : headerSection.getValue().getTtl();
if (entry.getDeliveryCount() != 0 || ttl != null)
{
Header header = new Header();
if (headerSection != null)
{
final Header oldHeader = headerSection.getValue();
header.setDurable(oldHeader.getDurable());
header.setPriority(oldHeader.getPriority());
if (ttl != null)
{
long timeSpentOnBroker = System.currentTimeMillis() - message.getArrivalTime();
final long adjustedTtl = Math.max(0L, ttl.longValue() - timeSpentOnBroker);
header.setTtl(UnsignedInteger.valueOf(adjustedTtl));
}
headerSection.dispose();
}
if (entry.getDeliveryCount() != 0)
{
header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
}
headerSection = header.createEncodingRetainingSection();
}
List<QpidByteBuffer> payload = new ArrayList<>();
if(headerSection != null)
{
payload.add(headerSection.getEncodedForm());
headerSection.dispose();
}
EncodingRetainingSection<?> section;
if((section = message.getDeliveryAnnotationsSection()) != null)
{
payload.add(section.getEncodedForm());
section.dispose();
}
if((section = message.getMessageAnnotationsSection()) != null)
{
payload.add(section.getEncodedForm());
section.dispose();
}
if((section = message.getPropertiesSection()) != null)
{
payload.add(section.getEncodedForm());
section.dispose();
}
if((section = message.getApplicationPropertiesSection()) != null)
{
payload.add(section.getEncodedForm());
section.dispose();
}
payload.add(bodyContent);
if((section = message.getFooterSection()) != null)
{
payload.add(section.getEncodedForm());
section.dispose();
}
try (QpidByteBuffer combined = QpidByteBuffer.concatenate(payload))
{
transfer.setPayload(combined);
}
payload.forEach(QpidByteBuffer::dispose);
byte[] data = new byte[8];
ByteBuffer.wrap(data).putLong(_deliveryTag++);
final Binary tag = new Binary(data);
transfer.setDeliveryTag(tag);
if (_linkEndpoint.isAttached())
{
boolean sendPreSettled = SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode());
if (sendPreSettled)
{
transfer.setSettled(true);
if (_acquires && _transactionId == null)
{
transfer.setState(new Accepted());
}
}
else
{
final UnsettledAction action;
if (_acquires)
{
action = new DispositionAction(tag, entry, consumer);
addUnacknowledgedMessage(entry);
}
else
{
action = new DoNothingAction();
}
_linkEndpoint.addUnsettled(tag, action, entry);
}
if (_transactionId != null)
{
TransactionalState state = new TransactionalState();
state.setTxnId(_transactionId);
transfer.setState(state);
}
if (_acquires && _transactionId != null)
{
try
{
ServerTransaction txn = _linkEndpoint.getTransaction(_transactionId);
txn.addPostTransactionAction(new ServerTransaction.Action()
{
@Override
public void postCommit()
{
}
@Override
public void onRollback()
{
entry.release(consumer);
_linkEndpoint.updateDisposition(tag, null, true);
}
});
final TransactionLogResource owningResource = entry.getOwningResource();
if (owningResource instanceof TransactionMonitor)
{
((TransactionMonitor) owningResource).registerTransaction(txn);
}
}
catch (UnknownTransactionException e)
{
entry.release(consumer);
getEndpoint().close(new Error(TransactionError.UNKNOWN_ID, e.getMessage()));
return;
}
}
getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
getEndpoint().transfer(transfer, false);
if (sendPreSettled && _acquires && _transactionId == null)
{
handleAcquiredEntrySentPareSettledNonTransactional(entry, consumer);
}
}
else
{
entry.release(consumer);
}
}
finally
{
transfer.dispose();
if(converter != null)
{
converter.dispose(message);
}
}
}