in proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java [558:709]
private boolean processTransportWorkSender(DeliveryImpl delivery,
SenderImpl snd)
{
TransportSender tpLink = snd.getTransportLink();
SessionImpl session = snd.getSession();
TransportSession tpSession = session.getTransportSession();
if (tpSession != null && tpSession.endSent()) {
// Too late to action this work, clear it.
return true;
}
boolean wasDone = delivery.isDone();
if(!delivery.isDone() &&
(delivery.getDataLength() > 0 || delivery != snd.current()) &&
tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
tpSession.isLocalChannelSet() &&
tpLink.getLocalHandle() != null && !_frameWriter.isFull())
{
DeliveryImpl inProgress = tpLink.getInProgressDelivery();
if(inProgress != null){
// There is an existing Delivery awaiting completion. Check it
// is the same Delivery object given and return if not, as we
// can't interleave Transfer frames for deliveries on a link.
if(inProgress != delivery) {
return false;
}
}
TransportDelivery tpDelivery = delivery.getTransportDelivery();
UnsignedInteger deliveryId;
if (tpDelivery != null) {
deliveryId = tpDelivery.getDeliveryId();
} else {
deliveryId = tpSession.getOutgoingDeliveryId();
tpSession.incrementOutgoingDeliveryId();
}
tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink);
delivery.setTransportDelivery(tpDelivery);
cachedTransfer.setDeliveryId(deliveryId);
cachedTransfer.setDeliveryTag(new Binary(delivery.getTag()));
cachedTransfer.setHandle(tpLink.getLocalHandle());
cachedTransfer.setRcvSettleMode(null);
cachedTransfer.setResume(false); // Ensure default is written
cachedTransfer.setAborted(false); // Ensure default is written
cachedTransfer.setBatchable(false); // Ensure default is written
if(delivery.getLocalState() != null)
{
cachedTransfer.setState(delivery.getLocalState());
}
else
{
cachedTransfer.setState(null);
}
if(delivery.isSettled())
{
cachedTransfer.setSettled(Boolean.TRUE);
}
else
{
cachedTransfer.setSettled(Boolean.FALSE);
tpSession.addUnsettledOutgoing(deliveryId, delivery);
}
if(snd.current() == delivery)
{
cachedTransfer.setMore(true);
}
else
{
// Partial transfers will reset this as needed to true in the FrameWriter
cachedTransfer.setMore(false);
}
int messageFormat = delivery.getMessageFormat();
if(messageFormat == DeliveryImpl.DEFAULT_MESSAGE_FORMAT) {
cachedTransfer.setMessageFormat(UnsignedInteger.ZERO);
} else {
cachedTransfer.setMessageFormat(UnsignedInteger.valueOf(messageFormat));
}
ReadableBuffer payload = delivery.getData();
int pending = payload.remaining();
try {
writeFrame(tpSession.getLocalChannel(), cachedTransfer, payload, () -> cachedTransfer.setMore(true));
} finally {
delivery.afterSend(); // Allow for freeing resources after write of buffered data
}
tpSession.incrementOutgoingId();
tpSession.decrementRemoteIncomingWindow();
if (payload == null || !payload.hasRemaining())
{
session.incrementOutgoingBytes(-pending);
if (!cachedTransfer.getMore()) {
// Clear the in-progress delivery marker
tpLink.setInProgressDelivery(null);
delivery.setDone();
tpLink.setDeliveryCount(tpLink.getDeliveryCount().add(UnsignedInteger.ONE));
tpLink.setLinkCredit(tpLink.getLinkCredit().subtract(UnsignedInteger.ONE));
session.incrementOutgoingDeliveries(-1);
snd.decrementQueued();
}
}
else
{
session.incrementOutgoingBytes(-(pending - payload.remaining()));
// Remember the delivery we are still processing
// the body transfer frames for
tpLink.setInProgressDelivery(delivery);
}
if (_emitFlowEventOnSend && snd.getLocalState() != EndpointState.CLOSED) {
getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
}
}
if(wasDone && delivery.getLocalState() != null && tpSession.isLocalChannelSet())
{
TransportDelivery tpDelivery = delivery.getTransportDelivery();
// Use cached object as holder of data for immediate write to the FrameWriter
cachedDisposition.setFirst(tpDelivery.getDeliveryId());
cachedDisposition.setLast(tpDelivery.getDeliveryId());
cachedDisposition.setRole(Role.SENDER);
cachedDisposition.setSettled(delivery.isSettled());
cachedDisposition.setBatchable(false); // Enforce default is written
if(delivery.isSettled())
{
tpDelivery.settled();
}
cachedDisposition.setState(delivery.getLocalState());
writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null);
}
if(!wasDone && tpLink != null && tpLink.detachSent()) {
// Too late to action this work, clear it.
return true;
}
return !delivery.isBuffered();
}