in qpid/messaging/driver.py [0:0]
def process(self, ssn):
if ssn.closed or ssn.closing: return
sst = self._attachments[ssn]
while sst.outgoing_idx < len(ssn.outgoing):
msg = ssn.outgoing[sst.outgoing_idx]
snd = msg._sender
# XXX: should check for sender error here
_snd = self._attachments.get(snd)
if _snd and snd.linked:
self.send(snd, msg)
sst.outgoing_idx += 1
else:
break
for snd in ssn.senders:
# XXX: should included snd.acked in this
if snd.synced >= snd.queued and sst.need_sync:
sst.write_cmd(ExecutionSync(), sync_noop)
for rcv in ssn.receivers:
self.process_receiver(rcv)
if ssn.acked:
messages = ssn.acked[sst.acked_idx:]
if messages:
ids = RangedSet()
disposed = [(DEFAULT_DISPOSITION, [])]
acked = []
for m in messages:
# XXX: we're ignoring acks that get lost when disconnected,
# could we deal this via some message-id based purge?
if m._transfer_id is None:
acked.append(m)
continue
ids.add(m._transfer_id)
if m._receiver._accept_mode is accept_mode.explicit:
disp = m._disposition or DEFAULT_DISPOSITION
last, msgs = disposed[-1]
if disp.type is last.type and disp.options == last.options:
msgs.append(m)
else:
disposed.append((disp, [m]))
else:
acked.append(m)
for range in ids:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
def ack_acker(msgs):
def ack_ack():
for m in msgs:
ssn.acked.remove(m)
sst.acked_idx -= 1
# XXX: should this check accept_mode too?
if not ssn.transactional:
sst.acked.remove(m)
return ack_ack
for disp, msgs in disposed:
if not msgs: continue
if disp.type is None:
op = MessageAccept
elif disp.type is RELEASED:
op = MessageRelease
elif disp.type is REJECTED:
op = MessageReject
sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
**disp.options),
ack_acker(msgs))
if log.isEnabledFor(DEBUG):
for m in msgs:
log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
sst.acked.extend(messages)
sst.acked_idx += len(messages)
ack_acker(acked)()
if ssn.committing and not sst.committing:
def commit_ok():
del sst.acked[:]
ssn.committing = False
ssn.committed = True
ssn.aborting = False
ssn.aborted = False
sst.committing = False
sst.write_cmd(TxCommit(), commit_ok)
sst.committing = True
if ssn.aborting and not sst.aborting:
sst.aborting = True
def do_rb():
messages = sst.acked + ssn.unacked + ssn.incoming
ids = RangedSet(*[m._transfer_id for m in messages])
for range in ids:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
sst.write_cmd(MessageRelease(ids, True))
sst.write_cmd(TxRollback(), do_rb_ok)
def do_rb_ok():
del ssn.incoming[:]
del ssn.unacked[:]
del sst.acked[:]
for rcv in ssn.receivers:
rcv.impending = rcv.received
rcv.returned = rcv.received
# XXX: do we need to update granted here as well?
for rcv in ssn.receivers:
self.process_receiver(rcv)
ssn.aborting = False
ssn.aborted = True
ssn.committing = False
ssn.committed = False
sst.aborting = False
for rcv in ssn.receivers:
_rcv = self._attachments[rcv]
sst.write_cmd(MessageStop(_rcv.destination))
sst.write_cmd(ExecutionSync(), do_rb)