in proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java [749:795]
private void processReceiverFlow()
{
if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent)
{
EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
while(endpoint != null)
{
if(endpoint instanceof ReceiverImpl)
{
ReceiverImpl receiver = (ReceiverImpl) endpoint;
TransportLink<?> transportLink = getTransportState(receiver);
TransportSession transportSession = getTransportState(receiver.getSession());
if(receiver.getLocalState() == EndpointState.ACTIVE && transportSession.isLocalChannelSet() && !receiver.detached())
{
int credits = receiver.clearUnsentCredits();
if(credits != 0 || receiver.getDrain() ||
transportSession.getIncomingWindowSize().equals(UnsignedInteger.ZERO))
{
transportLink.addCredit(credits);
writeFlow(transportSession, transportLink);
}
}
}
endpoint = endpoint.transportNext();
}
endpoint = _connectionEndpoint.getTransportHead();
while(endpoint != null)
{
if(endpoint instanceof SessionImpl)
{
SessionImpl session = (SessionImpl) endpoint;
TransportSession transportSession = getTransportState(session);
if(session.getLocalState() == EndpointState.ACTIVE && transportSession.isLocalChannelSet())
{
if(transportSession.getIncomingWindowSize().equals(UnsignedInteger.ZERO))
{
writeFlow(transportSession, null);
}
}
}
endpoint = endpoint.transportNext();
}
}
}