private boolean processTransportWorkSender()

in proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java [558:709]


    private boolean processTransportWorkSender(DeliveryImpl delivery,
                                               SenderImpl snd)
    {
        TransportSender tpLink = snd.getTransportLink();
        SessionImpl session = snd.getSession();
        TransportSession tpSession = session.getTransportSession();

        if (tpSession != null && tpSession.endSent()) {
            // Too late to action this work, clear it.
            return true;
        }

        boolean wasDone = delivery.isDone();

        if(!delivery.isDone() &&
           (delivery.getDataLength() > 0 || delivery != snd.current()) &&
           tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
           tpSession.isLocalChannelSet() &&
           tpLink.getLocalHandle() != null && !_frameWriter.isFull())
        {
            DeliveryImpl inProgress = tpLink.getInProgressDelivery();
            if(inProgress != null){
                // There is an existing Delivery awaiting completion. Check it
                // is the same Delivery object given and return if not, as we
                // can't interleave Transfer frames for deliveries on a link.
                if(inProgress != delivery) {
                    return false;
                }
            }

            TransportDelivery tpDelivery = delivery.getTransportDelivery();
            UnsignedInteger deliveryId;
            if (tpDelivery != null) {
                deliveryId = tpDelivery.getDeliveryId();
            } else {
                deliveryId = tpSession.getOutgoingDeliveryId();
                tpSession.incrementOutgoingDeliveryId();
            }
            tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink);
            delivery.setTransportDelivery(tpDelivery);

            cachedTransfer.setDeliveryId(deliveryId);
            cachedTransfer.setDeliveryTag(new Binary(delivery.getTag()));
            cachedTransfer.setHandle(tpLink.getLocalHandle());
            cachedTransfer.setRcvSettleMode(null);
            cachedTransfer.setResume(false); // Ensure default is written
            cachedTransfer.setAborted(false); // Ensure default is written
            cachedTransfer.setBatchable(false); // Ensure default is written

            if(delivery.getLocalState() != null)
            {
                cachedTransfer.setState(delivery.getLocalState());
            }
            else
            {
                cachedTransfer.setState(null);
            }

            if(delivery.isSettled())
            {
                cachedTransfer.setSettled(Boolean.TRUE);
            }
            else
            {
                cachedTransfer.setSettled(Boolean.FALSE);
                tpSession.addUnsettledOutgoing(deliveryId, delivery);
            }

            if(snd.current() == delivery)
            {
                cachedTransfer.setMore(true);
            }
            else
            {
                // Partial transfers will reset this as needed to true in the FrameWriter
                cachedTransfer.setMore(false);
            }

            int messageFormat = delivery.getMessageFormat();
            if(messageFormat == DeliveryImpl.DEFAULT_MESSAGE_FORMAT) {
                cachedTransfer.setMessageFormat(UnsignedInteger.ZERO);
            } else {
                cachedTransfer.setMessageFormat(UnsignedInteger.valueOf(messageFormat));
            }

            ReadableBuffer payload = delivery.getData();

            int pending = payload.remaining();

            try {
                writeFrame(tpSession.getLocalChannel(), cachedTransfer, payload, () -> cachedTransfer.setMore(true));
            } finally {
                delivery.afterSend();  // Allow for freeing resources after write of buffered data
            }

            tpSession.incrementOutgoingId();
            tpSession.decrementRemoteIncomingWindow();

            if (payload == null || !payload.hasRemaining())
            {
                session.incrementOutgoingBytes(-pending);

                if (!cachedTransfer.getMore()) {
                    // Clear the in-progress delivery marker
                    tpLink.setInProgressDelivery(null);

                    delivery.setDone();
                    tpLink.setDeliveryCount(tpLink.getDeliveryCount().add(UnsignedInteger.ONE));
                    tpLink.setLinkCredit(tpLink.getLinkCredit().subtract(UnsignedInteger.ONE));
                    session.incrementOutgoingDeliveries(-1);
                    snd.decrementQueued();
                }
            }
            else
            {
                session.incrementOutgoingBytes(-(pending - payload.remaining()));

                // Remember the delivery we are still processing
                // the body transfer frames for
                tpLink.setInProgressDelivery(delivery);
            }

            if (_emitFlowEventOnSend && snd.getLocalState() != EndpointState.CLOSED) {
                getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
            }
        }

        if(wasDone && delivery.getLocalState() != null && tpSession.isLocalChannelSet())
        {
            TransportDelivery tpDelivery = delivery.getTransportDelivery();
            // Use cached object as holder of data for immediate write to the FrameWriter
            cachedDisposition.setFirst(tpDelivery.getDeliveryId());
            cachedDisposition.setLast(tpDelivery.getDeliveryId());
            cachedDisposition.setRole(Role.SENDER);
            cachedDisposition.setSettled(delivery.isSettled());
            cachedDisposition.setBatchable(false);  // Enforce default is written
            if(delivery.isSettled())
            {
                tpDelivery.settled();
            }
            cachedDisposition.setState(delivery.getLocalState());

            writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null);
        }

        if(!wasDone && tpLink != null && tpLink.detachSent()) {
            // Too late to action this work, clear it.
            return true;
        }

        return !delivery.isBuffered();
    }