in proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java [280:356]
public void handleTransfer(Transfer transfer, Binary payload)
{
DeliveryImpl delivery;
incrementNextIncomingId(); // The conceptual/non-wire transfer-id, for the session window.
TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle());
UnsignedInteger linkIncomingDeliveryId = transportReceiver.getIncomingDeliveryId();
UnsignedInteger deliveryId = transfer.getDeliveryId();
if(linkIncomingDeliveryId != null && (linkIncomingDeliveryId.equals(deliveryId) || deliveryId == null))
{
delivery = _unsettledIncomingDeliveriesById.get(linkIncomingDeliveryId);
delivery.getTransportDelivery().incrementSessionSize();
}
else
{
verifyNewDeliveryIdSequence(_incomingDeliveryId, linkIncomingDeliveryId, deliveryId);
_incomingDeliveryId = deliveryId;
ReceiverImpl receiver = transportReceiver.getReceiver();
Binary deliveryTag = transfer.getDeliveryTag();
delivery = receiver.delivery(deliveryTag.getArray(), deliveryTag.getArrayOffset(),
deliveryTag.getLength());
UnsignedInteger messageFormat = transfer.getMessageFormat();
if(messageFormat != null) {
delivery.setMessageFormat(messageFormat.intValue());
}
TransportDelivery transportDelivery = new TransportDelivery(deliveryId, delivery, transportReceiver);
delivery.setTransportDelivery(transportDelivery);
transportReceiver.setIncomingDeliveryId(deliveryId);
_unsettledIncomingDeliveriesById.put(deliveryId, delivery);
getSession().incrementIncomingDeliveries(1);
}
if( transfer.getState()!=null )
{
delivery.setRemoteDeliveryState(transfer.getState());
}
_unsettledIncomingSize++;
boolean aborted = transfer.getAborted();
if (payload != null && !aborted)
{
delivery.append(payload);
getSession().incrementIncomingBytes(payload.getLength());
}
delivery.updateWork();
if(!transfer.getMore() || aborted)
{
transportReceiver.setIncomingDeliveryId(null);
if(aborted) {
delivery.setAborted();
} else {
delivery.setComplete();
}
delivery.getLink().getTransportLink().decrementLinkCredit();
delivery.getLink().getTransportLink().incrementDeliveryCount();
}
if(Boolean.TRUE.equals(transfer.getSettled()) || aborted)
{
delivery.setRemoteSettled(true);
}
_incomingWindowSize = _incomingWindowSize.subtract(UnsignedInteger.ONE);
// this will cause a flow to happen
if (_incomingWindowSize.equals(UnsignedInteger.ZERO)) {
delivery.getLink().modified(false);
}
getSession().getConnection().put(Event.Type.DELIVERY, delivery);
}