def process()

in qpid/messaging/driver.py [0:0]


  def process(self, ssn):
    if ssn.closed or ssn.closing: return

    sst = self._attachments[ssn]

    while sst.outgoing_idx < len(ssn.outgoing):
      msg = ssn.outgoing[sst.outgoing_idx]
      snd = msg._sender
      # XXX: should check for sender error here
      _snd = self._attachments.get(snd)
      if _snd and snd.linked:
        self.send(snd, msg)
        sst.outgoing_idx += 1
      else:
        break

    for snd in ssn.senders:
      # XXX: should included snd.acked in this
      if snd.synced >= snd.queued and sst.need_sync:
        sst.write_cmd(ExecutionSync(), sync_noop)

    for rcv in ssn.receivers:
      self.process_receiver(rcv)

    if ssn.acked:
      messages = ssn.acked[sst.acked_idx:]
      if messages:
        ids = RangedSet()

        disposed = [(DEFAULT_DISPOSITION, [])]
        acked = []
        for m in messages:
          # XXX: we're ignoring acks that get lost when disconnected,
          # could we deal this via some message-id based purge?
          if m._transfer_id is None:
            acked.append(m)
            continue
          ids.add(m._transfer_id)
          if m._receiver._accept_mode is accept_mode.explicit:
            disp = m._disposition or DEFAULT_DISPOSITION
            last, msgs = disposed[-1]
            if disp.type is last.type and disp.options == last.options:
              msgs.append(m)
            else:
              disposed.append((disp, [m]))
          else:
            acked.append(m)

        for range in ids:
          sst.executed.add_range(range)
        sst.write_op(SessionCompleted(sst.executed))

        def ack_acker(msgs):
          def ack_ack():
            for m in msgs:
              ssn.acked.remove(m)
              sst.acked_idx -= 1
              # XXX: should this check accept_mode too?
              if not ssn.transactional:
                sst.acked.remove(m)
          return ack_ack

        for disp, msgs in disposed:
          if not msgs: continue
          if disp.type is None:
            op = MessageAccept
          elif disp.type is RELEASED:
            op = MessageRelease
          elif disp.type is REJECTED:
            op = MessageReject
          sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
                           **disp.options),
                        ack_acker(msgs))
          if log.isEnabledFor(DEBUG):
            for m in msgs:
              log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)

        sst.acked.extend(messages)
        sst.acked_idx += len(messages)
        ack_acker(acked)()

    if ssn.committing and not sst.committing:
      def commit_ok():
        del sst.acked[:]
        ssn.committing = False
        ssn.committed = True
        ssn.aborting = False
        ssn.aborted = False
        sst.committing = False
      sst.write_cmd(TxCommit(), commit_ok)
      sst.committing = True

    if ssn.aborting and not sst.aborting:
      sst.aborting = True
      def do_rb():
        messages = sst.acked + ssn.unacked + ssn.incoming
        ids = RangedSet(*[m._transfer_id for m in messages])
        for range in ids:
          sst.executed.add_range(range)
        sst.write_op(SessionCompleted(sst.executed))
        sst.write_cmd(MessageRelease(ids, True))
        sst.write_cmd(TxRollback(), do_rb_ok)

      def do_rb_ok():
        del ssn.incoming[:]
        del ssn.unacked[:]
        del sst.acked[:]

        for rcv in ssn.receivers:
          rcv.impending = rcv.received
          rcv.returned = rcv.received
          # XXX: do we need to update granted here as well?

        for rcv in ssn.receivers:
          self.process_receiver(rcv)

        ssn.aborting = False
        ssn.aborted = True
        ssn.committing = False
        ssn.committed = False
        sst.aborting = False

      for rcv in ssn.receivers:
        _rcv = self._attachments[rcv]
        sst.write_cmd(MessageStop(_rcv.destination))
      sst.write_cmd(ExecutionSync(), do_rb)