in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java [119:178]
private void doSend(JmsOutboundMessageDispatch envelope, InFlightSend send) throws ProviderException {
LOG.trace("Producer sending message: {}", envelope);
boolean presettle = envelope.isPresettle() || isPresettle();
Delivery delivery = null;
if (presettle) {
delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
} else {
byte[] tag = tagGenerator.getNextTag();
delivery = getEndpoint().delivery(tag, 0, tag.length);
}
if (session.isTransacted()) {
AmqpTransactionContext context = session.getTransactionContext();
delivery.disposition(context.getTxnEnrolledState());
context.registerTxProducer(this);
}
// Write the already encoded AMQP message into the Sender
ByteBuf encoded = (ByteBuf) envelope.getPayload();
getEndpoint().sendNoCopy(new AmqpReadableBuffer(encoded.duplicate()));
AmqpProvider provider = getParent().getProvider();
if (!presettle && getSendTimeout() != JmsConnectionInfo.INFINITE && send.requestTimeout == null) {
send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(send, getSendTimeout(), send);
}
if (presettle) {
delivery.settle();
} else {
sent.put(envelope.getMessageId(), send);
getEndpoint().advance();
}
send.setDelivery(delivery);
delivery.setContext(send);
// Put it on the wire and let it fail if the connection is broken, if it does
// get written then continue on to determine when we should complete it.
if (provider.pumpToProtonTransport(send, false)) {
// For presettled messages we can just mark as successful and we are done, but
// for any other message we still track it until the remote settles. If the send
// was tagged as asynchronous we must mark the original request as complete but
// we still need to wait for the disposition before we can consider the send as
// having been successful.
if (presettle) {
send.onSuccess();
} else if (envelope.isSendAsync()) {
send.getOriginalRequest().onSuccess();
}
try {
provider.getTransport().flush();
} catch (Throwable ex) {
throw ProviderExceptionSupport.createOrPassthroughFatal(ex);
}
}
}