def _v1DispatchProtected()

in management/python/lib/qmf/console.py [0:0]


  def _v1DispatchProtected(self, msg):
    """
    This is the general message handler for messages received via the QMFv1 exchanges.
    """
    try:
      agent = None
      agent_addr = None
      mp = msg.get("message_properties")
      ah = mp.application_headers
      if ah and 'qmf.agent' in ah:
        agent_addr = ah['qmf.agent']

      if not agent_addr:
        #
        # See if we can determine the agent identity from the routing key
        #
        dp = msg.get("delivery_properties")
        rkey = None
        if dp and dp.routing_key:
          rkey = dp.routing_key
          items = rkey.split('.')
          if len(items) >= 4:
            if items[0] == 'console' and items[3].isdigit():
              agent_addr = str(items[3])  # The QMFv1 Agent Bank
      if agent_addr != None and agent_addr in self.agents:
        agent = self.agents[agent_addr]

      codec = Codec(msg.body)
      alreadyTried = None
      while True:
        opcode, seq = self._checkHeader(codec)

        if not agent and not alreadyTried:
          alreadyTried = True
          try:
            self.cv.acquire()
            if seq in self.seqToAgentMap:
              agent = self.seqToAgentMap[seq]
          finally:
            self.cv.release()

        if   opcode == None: break
        if   opcode == 'b': self.session._handleBrokerResp      (self, codec, seq)
        elif opcode == 'p': self.session._handlePackageInd      (self, codec, seq)
        elif opcode == 'q': self.session._handleClassInd        (self, codec, seq)
        elif opcode == 's': self.session._handleSchemaResp      (self, codec, seq, agent_addr)
        elif opcode == 'h': self.session._handleHeartbeatInd    (self, codec, seq, msg)
        elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent)
        elif agent:
          agent._handleQmfV1Message(opcode, seq, mp, ah, codec)
          agent.touch() # mark agent as being alive

    finally:  # always ack the message!
      try:
        # ignore failures as the session may be shutting down...
        self.amqpSession.receiver._completed.add(msg.id)
        self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
      except:
        pass