management/python/lib/qpidtoollibs/broker.py (342 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys from qpidtoollibs.disp import TimeLong try: from uuid import uuid4 except ImportError: from qpid.datatypes import uuid4 class BrokerAgent(object): """ Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection or qpid_messaging.Connection """ def __init__(self, conn): # Use the Message class from the same module as conn which could be qpid.messaging # or qpid_messaging self.message_class = sys.modules[conn.__class__.__module__].Message self.conn = conn self.sess = self.conn.session() self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}}" % str(uuid4()) self.reply_rx = self.sess.receiver(self.reply_to) self.reply_rx.capacity = 10 self.tx = self.sess.sender("qmf.default.direct/broker") self.next_correlator = 1 def close(self): """ Close the proxy session. This will not affect the connection used in creating the object. """ self.sess.close() def _method(self, method, arguments=None, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): props = {'method' : 'request', 'qmf.opcode' : '_method_request', 'x-amqp-0-10.app-id' : 'qmf2'} correlator = str(self.next_correlator) self.next_correlator += 1 content = {'_object_id' : {'_object_name' : addr}, '_method_name' : method, '_arguments' : arguments or {}} message = self.message_class( content, reply_to=self.reply_to, correlation_id=correlator, properties=props, subject="broker") self.tx.send(message) response = self.reply_rx.fetch(timeout) self.sess.acknowledge() if response.properties['qmf.opcode'] == '_exception': raise Exception("Exception from Agent: %r" % response.content['_values']) if response.properties['qmf.opcode'] != '_method_response': raise Exception("bad response: %r" % response.properties) return response.content['_arguments'] def _sendRequest(self, opcode, content): props = {'method' : 'request', 'qmf.opcode' : opcode, 'x-amqp-0-10.app-id' : 'qmf2'} correlator = str(self.next_correlator) self.next_correlator += 1 message = self.message_class( content, reply_to=self.reply_to, correlation_id=correlator, properties=props, subject="broker") self.tx.send(message) return correlator def _doClassQuery(self, class_name): query = {'_what' : 'OBJECT', '_schema_id' : {'_class_name' : class_name}} correlator = self._sendRequest('_query_request', query) response = self.reply_rx.fetch(10) if response.properties['qmf.opcode'] != '_query_response': raise Exception("bad response") items = [] done = False while not done: for item in response.content: items.append(item) if 'partial' in response.properties: response = self.reply_rx.fetch(10) else: done = True self.sess.acknowledge() return items def _doNameQuery(self, object_id): query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : object_id}} correlator = self._sendRequest('_query_request', query) response = self.reply_rx.fetch(10) if response.properties['qmf.opcode'] != '_query_response': raise Exception("bad response") items = [] done = False while not done: for item in response.content: items.append(item) if 'partial' in response.properties: response = self.reply_rx.fetch(10) else: done = True self.sess.acknowledge() if len(items) == 1: return items[0] return None def _getAllBrokerObjects(self, cls): items = self._doClassQuery(cls.__name__.lower()) objs = [] for item in items: objs.append(cls(self, item)) return objs def _getBrokerObject(self, cls, oid): obj = self._doNameQuery(oid) if obj: return cls(self, obj) return None def _getSingleObject(self, cls): # # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because # of a bug that used to be in the broker whereby by-name queries did not return the # object timestamps. # objects = self._getAllBrokerObjects(cls) if objects: return objects[0] return None def getBroker(self): """ Get the Broker object that contains broker-scope statistics and operations. """ return self._getSingleObject(Broker) def getCluster(self): return self._getSingleObject(Cluster) def getHaBroker(self): return self._getSingleObject(HaBroker) def getAllConnections(self): return self._getAllBrokerObjects(Connection) def getConnection(self, oid): return self._getBrokerObject(Connection, "org.apache.qpid.broker:connection:%s" % oid) def getAllSessions(self): return self._getAllBrokerObjects(Session) def getSession(self, oid): return self._getBrokerObject(Session, "org.apache.qpid.broker:session:%s" % oid) def getAllSubscriptions(self): return self._getAllBrokerObjects(Subscription) def getSubscription(self, oid): return self._getBrokerObject(Subscription, "org.apache.qpid.broker:subscription:%s" % oid) def getAllExchanges(self): return self._getAllBrokerObjects(Exchange) def getExchange(self, name): return self._getBrokerObject(Exchange, "org.apache.qpid.broker:exchange:%s" % name) def getAllQueues(self): return self._getAllBrokerObjects(Queue) def getQueue(self, name): return self._getBrokerObject(Queue, "org.apache.qpid.broker:queue:%s" % name) def getAllBindings(self): return self._getAllBrokerObjects(Binding) def getAllLinks(self): return self._getAllBrokerObjects(Link) def getAcl(self): return self._getSingleObject(Acl) def getMemory(self): return self._getSingleObject(Memory) def echo(self, sequence = 1, body = "Body"): """Request a response to test the path to the management broker""" args = {'sequence' : sequence, 'body' : body} return self._method('echo', args) def connect(self, host, port, durable, authMechanism, username, password, transport): """Establish a connection to another broker""" pass def queueMoveMessages(self, srcQueue, destQueue, qty): """Move messages from one queue to another""" self._method("queueMoveMessages", {'srcQueue':srcQueue,'destQueue':destQueue,'qty':qty}) def queueRedirect(self, sourceQueue, targetQueue): """Enable/disable delivery redirect for indicated queues""" self._method("queueRedirect", {'sourceQueue':sourceQueue,'targetQueue':targetQueue}) def setLogLevel(self, level): """Set the log level""" self._method("setLogLevel", {'level':level}) def getLogLevel(self): """Get the log level""" return self._method('getLogLevel') def setTimestampConfig(self, receive): """Set the message timestamping configuration""" self._method("setTimestampConfig", {'receive':receive}) def getTimestampConfig(self): """Get the message timestamping configuration""" return self._method('getTimestampConfig') def setLogHiresTimestamp(self, logHires): """Set the high resolution timestamp in logs""" self._method("setLogHiresTimestamp", {'logHires':logHires}) def getLogHiresTimestamp(self): """Get the high resolution timestamp in logs""" return self._method('getLogHiresTimestamp') def addExchange(self, exchange_type, name, options={}, **kwargs): properties = {} properties['exchange-type'] = exchange_type for k,v in options.items(): properties[k] = v for k,v in kwargs.items(): properties[k] = v args = {'type': 'exchange', 'name': name, 'properties': properties, 'strict': True} self._method('create', args) def delExchange(self, name): args = {'type': 'exchange', 'name': name} self._method('delete', args) def addQueue(self, name, options={}, **kwargs): properties = options for k,v in kwargs.items(): properties[k] = v args = {'type': 'queue', 'name': name, 'properties': properties, 'strict': True} self._method('create', args) def delQueue(self, name, if_empty=True, if_unused=True): options = {'if_empty': if_empty, 'if_unused': if_unused} args = {'type': 'queue', 'name': name, 'options': options} self._method('delete', args) def bind(self, exchange, queue, key="", options={}, **kwargs): properties = options for k,v in kwargs.items(): properties[k] = v args = {'type': 'binding', 'name': "%s/%s/%s" % (exchange, queue, key), 'properties': properties, 'strict': True} self._method('create', args) def unbind(self, exchange, queue, key, **kwargs): args = {'type': 'binding', 'name': "%s/%s/%s" % (exchange, queue, key), 'strict': True} self._method('delete', args) def reloadAclFile(self): self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") def acl_lookup(self, userName, action, aclObj, aclObjName, propMap): args = {'userId': userName, 'action': action, 'object': aclObj, 'objectName': aclObjName, 'propertyMap': propMap} return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") def acl_lookupPublish(self, userName, exchange, key): args = {'userId': userName, 'exchangeName': exchange, 'routingKey': key} return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") def Redirect(self, sourceQueue, targetQueue): args = {'sourceQueue': sourceQueue, 'targetQueue': targetQueue} return self._method('queueRedirect', args, "org.apache.qpid.broker:broker:amqp-broker") def create(self, _type, name, properties={}, strict=False): """Create an object of the specified type""" args = {'type': _type, 'name': name, 'properties': properties, 'strict': strict} return self._method('create', args) def delete(self, _type, name, options): """Delete an object of the specified type""" args = {'type': _type, 'name': name, 'options': options} return self._method('delete', args) def list(self, _type): """List objects of the specified type""" return [i["_values"] for i in self._doClassQuery(_type.lower())] def query(self, _type, oid): """Query the current state of an object""" return self._getBrokerObject(self, _type, oid) class EventHelper(object): def eventAddress(self, pkg='*', cls='*', sev='*'): return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', '_'), cls, sev) def event(self, msg): return BrokerEvent(msg) class BrokerEvent(object): def __init__(self, msg): self.msg = msg self.content = msg.content[0] self.values = self.content['_values'] self.schema_id = self.content['_schema_id'] self.name = "%s:%s" % (self.schema_id['_package_name'], self.schema_id['_class_name']) def __repr__(self): rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name) for k,v in self.values.items(): rep = rep + " %s=%s" % (k, v) return rep def __getattr__(self, key): if key not in self.values: return None value = self.values[key] return value def getAttributes(self): return self.values def getTimestamp(self): return self.content['_timestamp'] class BrokerObject(object): def __init__(self, broker, content): self.broker = broker self.content = content self.values = content['_values'] def __getattr__(self, key): if key not in self.values: return None value = self.values[key] if value.__class__ == dict and '_object_name' in value: full_name = value['_object_name'] colon = full_name.find(':') if colon > 0: full_name = full_name[colon+1:] colon = full_name.find(':') if colon > 0: return full_name[colon+1:] return value def getObjectId(self): return self.content['_object_id']['_object_name'] def getAttributes(self): return self.values def getCreateTime(self): return self.content['_create_ts'] def getDeleteTime(self): return self.content['_delete_ts'] def getUpdateTime(self): return self.content['_update_ts'] def update(self): """ Reload the property values from the agent. """ refreshed = self.broker._getBrokerObject(self.__class__, self.getObjectId()) if refreshed: self.content = refreshed.content self.values = self.content['_values'] else: raise Exception("No longer exists on the broker") class Broker(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Cluster(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class HaBroker(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Memory(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Connection(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) def close(self): self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address) class Session(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Subscription(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) def __repr__(self): return "subscription name undefined" class Exchange(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Binding(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) def __repr__(self): return "Binding key: %s" % self.values['bindingKey'] class Queue(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) def purge(self, request): """Discard all or some messages on a queue""" self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name) def reroute(self, request, useAltExchange, exchange, filter={}): """Remove all or some messages on this queue and route them to an exchange""" self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, "org.apache.qpid.broker:queue:%s" % self.name) class Link(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Acl(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values)