management/python/lib/qmf/client.py (343 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License # """ AMQP 1.0 QMF client for the Qpid C++ broker. This client is based on the Qpid-Proton library which only supports AMQP 1.0, it is intended for forward-looking projects that want to move to the newer client libraries. One important feature is that it does not start background threads, which makes it more suitable for environments that may fork. """ from proton import Message from proton.utils import BlockingConnection, IncomingMessageHandler, ConnectionException import threading, struct, time class ReconnectDelays(object): """ An iterable that returns a (possibly unlimited) sequence of delays to for successive reconnect attempts. """ def __init__(self, shortest, longest, repeat=True): """ First delay is 0, then `shortest`. Successive delays are doubled up to a maximum of `longest`. If `repeat` is a number > 0 then the `longest` value is generated `repeat` more times. If `repeat` is True, the longest value is returned without limit. For example: ReconnectDelays(.125, 1, True) will generate the following sequence: 0, .125, .25, .5, 1, 1, 1, 1, 1, 1 ... (forever) """ if shortest <= 0 or shortest > longest or repeat < 0: raise ValueError("invalid arguments for reconnect_delays()") self.shortest, self.longest, self.repeat = shortest, longest, repeat def _generate(self): yield 0 delay = self.shortest while delay < self.longest: yield delay delay *= 2 yield self.longest if self.repeat is True: while True: yield self.longest elif self.repeat: for i in xrange(self.repeat): yield self.longest def __iter__(self): return self._generate() class SyncRequestResponse(IncomingMessageHandler): """ Implementation of the synchronous request-responce (aka RPC) pattern. @ivar address: Address for all requests, may be None. @ivar connection: Connection for requests and responses. """ def __init__(self, connection, address=None): """ Send requests and receive responses. A single instance can send many requests to the same or different addresses. @param connection: A L{BlockingConnection} @param address: Address for all requests. If not specified, each request must have the address property set. Sucessive messages may have different addresses. """ super(SyncRequestResponse, self).__init__() self.address = address self.response = None self._cid = 0 self.lock = threading.Lock() self.reconnect(connection) def reconnect(self, connection): self.connection = connection self.sender = self.connection.create_sender(self.address) # dynamic=true generates a unique address dynamically for this receiver. # credit=1 because we want to receive 1 response message initially. self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) def _next(self): """Get the next correlation ID""" self.lock.acquire() try: self._cid += 1; return struct.pack("I", self._cid).decode('ascii') finally: self.lock.release() def call(self, request): """ Send a request message, wait for and return the response message. @param request: A L{proton.Message}. If L{self.address} is not set the L{self.address} must be set and will be used. """ return self.wait(self.send(request)) def send(self, request): """ Send a request and return the correlation_id immediately. Use wait() to get the response. @param request: A L{proton.Message}. If L{self.address} is not set the L{self.address} must be set and will be used. """ if not self.address and not request.address: raise ValueError("Request message has no address: %s" % request) request.reply_to = self.reply_to request.correlation_id = self._next() self.sender.send(request) return request.correlation_id def wait(self, correlation_id): """Wait for and return a single response to a request previously sent with send()""" def wakeup(): return self.response and (self.response.correlation_id == correlation_id) self.connection.wait(wakeup, msg="Waiting for response") response = self.response self.response = None # Ready for next response. self.receiver.flow(1) # Set up credit for the next response. return response @property def reply_to(self): """Return the dynamic address of our receiver.""" return self.receiver.remote_source.address def on_message(self, event): """Called when we receive a message for our receiver.""" self.response = event.message self.connection.container.yield_() # Wake up the wait() loop to handle the message. class BrokerAgent(object): """Proxy for a manageable Qpid broker""" @staticmethod def connection(url=None, timeout=10, ssl_domain=None, sasl=None): """Return a BlockingConnection suitable for use with a BrokerAgent.""" return BlockingConnection(url, timeout=timeout, ssl_domain=ssl_domain, allowed_mechs=str(sasl.mechs) if sasl else None, user=str(sasl.user) if sasl else None, password=str(sasl.password) if sasl else None) @staticmethod def connect(url=None, timeout=10, ssl_domain=None, sasl=None, reconnect_delays=None): """ Return a BrokerAgent connected with the given parameters. @param reconnect_delays: iterable of delays for successive automatic re-connect attempts, see class ReconnectDelays. If None there is no automatic re-connect """ f = lambda: BrokerAgent.connection(url, timeout, ssl_domain, sasl) ba = BrokerAgent(f()) ba.make_connection = f ba.reconnect_delays = reconnect_delays or [] return ba def __init__(self, connection): """ Create a management node proxy using the given connection. @param locales: Default list of locales for management operations. @param connection: a L{BlockingConnection} to the management agent. """ path = connection.url.path or "qmf.default.direct" self._client = SyncRequestResponse(connection, path) self.reconnect_delays = None def reconnect(self, connection): self._client.reconnect(connection) def close(self): """Shut down the node""" if self._client: self._client.connection.close() self._client = None def __repr__(self): return "%s(%s)"%(self.__class__.__name__, self._client.connection.url) def _reconnect(self): for d in self.reconnect_delays: time.sleep(d) try: self.reconnect(self.make_connection()) return True except ConnectionException: pass return False def _retry(self, f, *args, **kwargs): while True: try: return f(*args, **kwargs) except ConnectionException: if not self._reconnect(): raise def _request(self, opcode, content): props = {u'method' : u'request', u'qmf.opcode' : opcode, u'x-amqp-0-10.app-id' : u'qmf2'} return self._client.call(Message(body=content, properties=props, subject="broker")) def _method(self, method, arguments=None, addr="org.apache.qpid.broker:broker:amqp-broker"): """ Make a L{proton.Message} containing a QMF method request. """ content = {'_object_id' : {'_object_name' : addr}, '_method_name' : method, '_arguments' : arguments or {}} response = self._retry(self._request, u'_method_request', content) if response.properties[u'qmf.opcode'] == u'_exception': raise Exception("management error: %r" % response.body['_values']) if response.properties[u'qmf.opcode'] != u'_method_response': raise Exception("bad response: %r" % response.properties) return response.body['_arguments'] def _gather(self, response): items = response.body while u'partial' in response.properties: response = self._client.wait() items += self._client.wait(response.correlation_id).body return items def _classQuery(self, class_name): query = {'_what' : 'OBJECT', '_schema_id' : {'_class_name' : class_name}} def f(): response = self._request(u'_query_request', query) if response.properties[u'qmf.opcode'] != u'_query_response': raise Exception("bad response") return self._gather(response) return self._retry(f) def _nameQuery(self, object_id): query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : object_id}} def f(): response = self._request(u'_query_request', query) if response.properties[u'qmf.opcode'] != u'_query_response': raise Exception("bad response") items = self._gather(response) if len(items) == 1: return items[0] return None return self._retry(f) def _getAll(self, cls): return [cls(self, x) for x in self._classQuery(cls.__name__.lower())] def _getSingle(self, cls): l = self._getAll(cls) return l and l[0] def _get(self, cls, oid): x = self._nameQuery(oid) return x and cls(self, x) def getBroker(self): return self._getSingle(Broker) def getCluster(self): return self._getSingle(Cluster) def getHaBroker(self): return self._getSingle(HaBroker) def getAllConnections(self): return self._getAll(Connection) def getConnection(self, oid): return self._get(Connection, "org.apache.qpid.broker:connection:%s" % oid) def getAllSessions(self): return self._getAll(Session) def getSession(self, oid): return self._get(Session, "org.apache.qpid.broker:session:%s" % oid) def getAllSubscriptions(self): return self._getAll(Subscription) def getSubscription(self, oid): return self._get(Subscription, "org.apache.qpid.broker:subscription:%s" % oid) def getAllExchanges(self): return self._getAll(Exchange) def getExchange(self, name): return self._get(Exchange, "org.apache.qpid.broker:exchange:%s" % name) def getAllQueues(self): return self._getAll(Queue) def getQueue(self, name): return self._get(Queue, "org.apache.qpid.broker:queue:%s" % name) def getAllBindings(self): return self._getAll(Binding) def getAllLinks(self): return self._getAll(Link) def getAcl(self): return self._getSingle(Acl) def getMemory(self): return self._getSingle(Memory) def echo(self, sequence = 1, body = "Body"): """Request a response to test the path to the management broker""" return self._method('echo', {'sequence' : sequence, 'body' : body}) def queueMoveMessages(self, srcQueue, destQueue, qty): """Move messages from one queue to another""" self._method("queueMoveMessages", {'srcQueue':srcQueue,'destQueue':destQueue,'qty':qty}) def queueRedirect(self, sourceQueue, targetQueue): """Enable/disable delivery redirect for indicated queues""" self._method("queueRedirect", {'sourceQueue':sourceQueue,'targetQueue':targetQueue}) def setLogLevel(self, level): """Set the log level""" self._method("setLogLevel", {'level':level}) def getLogLevel(self): """Get the log level""" return self._method('getLogLevel') def setTimestampConfig(self, receive): """Set the message timestamping configuration""" self._method("setTimestampConfig", {'receive':receive}) def getTimestampConfig(self): """Get the message timestamping configuration""" return self._method('getTimestampConfig') def setLogHiresTimestamp(self, logHires): """Set the high resolution timestamp in logs""" self._method("setLogHiresTimestamp", {'logHires':logHires}) def getLogHiresTimestamp(self): """Get the high resolution timestamp in logs""" return self._method('getLogHiresTimestamp') def addExchange(self, exchange_type, name, options={}, **kwargs): properties = {} properties['exchange-type'] = exchange_type for k,v in options.items(): properties[k] = v for k,v in kwargs.items(): properties[k] = v args = {'type': 'exchange', 'name': name, 'properties': properties, 'strict': True} self._method('create', args) def delExchange(self, name): args = {'type': 'exchange', 'name': name} self._method('delete', args) def addQueue(self, name, options={}, **kwargs): properties = options for k,v in kwargs.items(): properties[k] = v args = {'type': 'queue', 'name': name, 'properties': properties, 'strict': True} self._method('create', args) def delQueue(self, name, if_empty=True, if_unused=True): options = {'if_empty': if_empty, 'if_unused': if_unused} args = {'type': 'queue', 'name': name, 'options': options} self._method('delete', args) def bind(self, exchange, queue, key="", options={}, **kwargs): properties = options for k,v in kwargs.items(): properties[k] = v args = {'type': 'binding', 'name': "%s/%s/%s" % (exchange, queue, key), 'properties': properties, 'strict': True} self._method('create', args) def unbind(self, exchange, queue, key, **kwargs): args = {'type': 'binding', 'name': "%s/%s/%s" % (exchange, queue, key), 'strict': True} self._method('delete', args) def reloadAclFile(self): self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") def acl_lookup(self, userName, action, aclObj, aclObjName, propMap): args = {'userId': userName, 'action': action, 'object': aclObj, 'objectName': aclObjName, 'propertyMap': propMap} return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") def acl_lookupPublish(self, userName, exchange, key): args = {'userId': userName, 'exchangeName': exchange, 'routingKey': key} return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") def Redirect(self, sourceQueue, targetQueue): args = {'sourceQueue': sourceQueue, 'targetQueue': targetQueue} return self._method('queueRedirect', args, "org.apache.qpid.broker:broker:amqp-broker") def create(self, _type, name, properties={}, strict=False): """Create an object of the specified type""" args = {'type': _type, 'name': name, 'properties': properties, 'strict': strict} return self._method('create', args) def delete(self, _type, name, options): """Delete an object of the specified type""" args = {'type': _type, 'name': name, 'options': options} return self._method('delete', args) def list(self, _type): """List objects of the specified type""" return [i["_values"] for i in self._classQuery(_type.lower())] def query(self, _type, oid): """Query the current state of an object""" return self._get(self, _type, oid) class BrokerObject(object): def __init__(self, broker, content): self.broker = broker self.content = content @property def values(self): return self.content['_values'] def __getattr__(self, key): return self.values.get(key) def getObjectId(self): return self.content['_object_id']['_object_name'] def getAttributes(self): return self.values def getCreateTime(self): return self.content['_create_ts'] def getDeleteTime(self): return self.content['_delete_ts'] def getUpdateTime(self): return self.content['_update_ts'] def update(self): """ Reload the property values from the agent. """ refreshed = self.broker._get(self.__class__, self.getObjectId()) if refreshed: self.content = refreshed.content self.values = self.content['_values'] else: raise Exception("No longer exists on the broker") def __repr__(self): return "%s(%s)"%(self.__class__.__name__, self.content) def __str__(self): return self.getObjectId() class Broker(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Cluster(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class HaBroker(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Memory(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Connection(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) def close(self): self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address) class Session(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Subscription(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Exchange(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Binding(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Queue(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) def purge(self, request): """Discard all or some messages on a queue""" self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name) def reroute(self, request, useAltExchange, exchange, filter={}): """Remove all or some messages on this queue and route them to an exchange""" self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, "org.apache.qpid.broker:queue:%s" % self.name) class Link(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values) class Acl(BrokerObject): def __init__(self, broker, values): BrokerObject.__init__(self, broker, values)