in qpid/messaging/driver.py [0:0]
def grant(self, rcv):
sst = self._attachments[rcv.session]
_rcv = self._attachments.get(rcv)
if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining:
return
if rcv.granted is UNLIMITED:
if rcv.impending is UNLIMITED:
delta = 0
else:
delta = UNLIMITED
elif rcv.impending is UNLIMITED:
delta = -1
else:
delta = max(rcv.granted, rcv.received) - rcv.impending
if delta is UNLIMITED:
if not _rcv.bytes_open:
sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
_rcv.bytes_open = True
sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
rcv.impending = UNLIMITED
elif delta > 0:
if not _rcv.bytes_open:
sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
_rcv.bytes_open = True
sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
rcv.impending += delta
elif delta < 0 and not rcv.draining:
_rcv.draining = True
def do_stop():
rcv.impending = rcv.received
_rcv.draining = False
_rcv.bytes_open = False
self.grant(rcv)
sst.write_cmd(MessageStop(_rcv.destination), do_stop)
if rcv.draining:
_rcv.draining = True
def do_flush():
rcv.impending = rcv.received
rcv.granted = rcv.impending
_rcv.draining = False
_rcv.bytes_open = False
rcv.draining = False
sst.write_cmd(MessageFlush(_rcv.destination), do_flush)