in management/python/lib/qmf/console.py [0:0]
def _v1DispatchProtected(self, msg):
"""
This is the general message handler for messages received via the QMFv1 exchanges.
"""
try:
agent = None
agent_addr = None
mp = msg.get("message_properties")
ah = mp.application_headers
if ah and 'qmf.agent' in ah:
agent_addr = ah['qmf.agent']
if not agent_addr:
#
# See if we can determine the agent identity from the routing key
#
dp = msg.get("delivery_properties")
rkey = None
if dp and dp.routing_key:
rkey = dp.routing_key
items = rkey.split('.')
if len(items) >= 4:
if items[0] == 'console' and items[3].isdigit():
agent_addr = str(items[3]) # The QMFv1 Agent Bank
if agent_addr != None and agent_addr in self.agents:
agent = self.agents[agent_addr]
codec = Codec(msg.body)
alreadyTried = None
while True:
opcode, seq = self._checkHeader(codec)
if not agent and not alreadyTried:
alreadyTried = True
try:
self.cv.acquire()
if seq in self.seqToAgentMap:
agent = self.seqToAgentMap[seq]
finally:
self.cv.release()
if opcode == None: break
if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr)
elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg)
elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent)
elif agent:
agent._handleQmfV1Message(opcode, seq, mp, ah, codec)
agent.touch() # mark agent as being alive
finally: # always ack the message!
try:
# ignore failures as the session may be shutting down...
self.amqpSession.receiver._completed.add(msg.id)
self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
except:
pass