#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License
#

"""
AMQP 1.0 QMF client for the Qpid C++ broker.

This client is based on the Qpid-Proton library which only supports AMQP 1.0, it
is intended for forward-looking projects that want to move to the newer client
libraries. One important feature is that it does not start background threads,
which makes it more suitable for environments that may fork.
"""

from proton import Message
from proton.utils import BlockingConnection, IncomingMessageHandler, ConnectionException
import threading, struct, time

class ReconnectDelays(object):
    """
    An iterable that returns a (possibly unlimited) sequence of delays to for
    successive reconnect attempts.
    """

    def __init__(self, shortest, longest, repeat=True):
        """
        First delay is 0, then `shortest`. Successive delays are doubled up to a
        maximum of `longest`. If `repeat` is a number > 0 then the `longest` value
        is generated `repeat` more times.  If `repeat` is True, the longest
        value is returned without limit.

        For example: ReconnectDelays(.125, 1, True) will generate the following sequence:
            0, .125, .25, .5, 1, 1, 1, 1, 1, 1 ... (forever)
        """
        if shortest <= 0 or shortest > longest or repeat < 0:
            raise ValueError("invalid arguments for reconnect_delays()")
        self.shortest, self.longest, self.repeat = shortest, longest, repeat

    def _generate(self):
        yield 0
        delay = self.shortest
        while delay < self.longest:
            yield delay
            delay *= 2
        yield self.longest
        if self.repeat is True:
            while True:
                yield self.longest
        elif self.repeat:
            for i in xrange(self.repeat):
                yield self.longest

    def __iter__(self):
        return self._generate()


class SyncRequestResponse(IncomingMessageHandler):
    """
    Implementation of the synchronous request-responce (aka RPC) pattern.
    @ivar address: Address for all requests, may be None.
    @ivar connection: Connection for requests and responses.
    """

    def __init__(self, connection, address=None):
        """
        Send requests and receive responses. A single instance can send many requests
        to the same or different addresses.

        @param connection: A L{BlockingConnection}
        @param address: Address for all requests.
            If not specified, each request must have the address property set.
            Sucessive messages may have different addresses.
        """
        super(SyncRequestResponse, self).__init__()
        self.address = address
        self.response = None
        self._cid = 0
        self.lock = threading.Lock()
        self.reconnect(connection)

    def reconnect(self, connection):
        self.connection = connection
        self.sender = self.connection.create_sender(self.address)
        # dynamic=true generates a unique address dynamically for this receiver.
        # credit=1 because we want to receive 1 response message initially.
        self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self)

    def _next(self):
        """Get the next correlation ID"""
        self.lock.acquire()
        try:
            self._cid += 1;
            return struct.pack("I", self._cid).decode('ascii')
        finally:
            self.lock.release()

    def call(self, request):
        """
        Send a request message, wait for and return the response message.

        @param request: A L{proton.Message}. If L{self.address} is not set the 
            L{self.address} must be set and will be used.
        """
        return self.wait(self.send(request))

    def send(self, request):
        """
        Send a request and return the correlation_id immediately. Use wait() to get the response.
        @param request: A L{proton.Message}. If L{self.address} is not set the 
            L{self.address} must be set and will be used.
        """
        if not self.address and not request.address:
            raise ValueError("Request message has no address: %s" % request)
        request.reply_to = self.reply_to
        request.correlation_id = self._next()
        self.sender.send(request)
        return request.correlation_id

    def wait(self, correlation_id):
        """Wait for and return a single response to a request previously sent with send()"""
        def wakeup():
            return self.response and (self.response.correlation_id == correlation_id)
        self.connection.wait(wakeup, msg="Waiting for response")
        response = self.response
        self.response = None    # Ready for next response.
        self.receiver.flow(1)   # Set up credit for the next response.
        return response

    @property
    def reply_to(self):
        """Return the dynamic address of our receiver."""
        return self.receiver.remote_source.address

    def on_message(self, event):
        """Called when we receive a message for our receiver."""
        self.response = event.message
        self.connection.container.yield_() # Wake up the wait() loop to handle the message.


class BrokerAgent(object):
    """Proxy for a manageable Qpid broker"""

    @staticmethod
    def connection(url=None, timeout=10, ssl_domain=None, sasl=None):
        """Return a BlockingConnection suitable for use with a BrokerAgent."""
        return BlockingConnection(url,
                                  timeout=timeout,
                                  ssl_domain=ssl_domain,
                                  allowed_mechs=str(sasl.mechs) if sasl else None,
                                  user=str(sasl.user) if sasl else None,
                                  password=str(sasl.password) if sasl else None)

    @staticmethod
    def connect(url=None, timeout=10, ssl_domain=None, sasl=None, reconnect_delays=None):
        """
        Return a BrokerAgent connected with the given parameters.
        @param reconnect_delays: iterable of delays for successive automatic re-connect attempts,
        see class ReconnectDelays. If None there is no automatic re-connect
        """
        f = lambda: BrokerAgent.connection(url, timeout, ssl_domain, sasl)
        ba = BrokerAgent(f())
        ba.make_connection = f
        ba.reconnect_delays = reconnect_delays or []
        return ba

    def __init__(self, connection):
        """
        Create a management node proxy using the given connection.
        @param locales: Default list of locales for management operations.
        @param connection: a L{BlockingConnection} to the management agent.
        """
        path = connection.url.path or "qmf.default.direct"
        self._client = SyncRequestResponse(connection, path)
        self.reconnect_delays = None

    def reconnect(self, connection):
        self._client.reconnect(connection)

    def close(self):
        """Shut down the node"""
        if self._client:
            self._client.connection.close()
            self._client = None

    def __repr__(self):
        return "%s(%s)"%(self.__class__.__name__, self._client.connection.url)

    def _reconnect(self):
        for d in self.reconnect_delays:
            time.sleep(d)
            try:
                self.reconnect(self.make_connection())
                return True
            except ConnectionException:
                pass
        return False

    def _retry(self, f, *args, **kwargs):
        while True:
            try:
                return f(*args, **kwargs)
            except ConnectionException:
                if not self._reconnect():
                    raise

    def _request(self, opcode, content):
        props = {u'method'             : u'request',
                 u'qmf.opcode'         : opcode,
                 u'x-amqp-0-10.app-id' : u'qmf2'}
        return self._client.call(Message(body=content, properties=props, subject="broker"))

    def _method(self, method, arguments=None, addr="org.apache.qpid.broker:broker:amqp-broker"):
        """
        Make a L{proton.Message} containing a QMF method request.
        """
        content = {'_object_id'   : {'_object_name' : addr},
                   '_method_name' : method,
                   '_arguments'   : arguments or {}}
        response = self._retry(self._request, u'_method_request', content)
        if response.properties[u'qmf.opcode'] == u'_exception':
            raise Exception("management error: %r" % response.body['_values'])
        if response.properties[u'qmf.opcode'] != u'_method_response':
            raise Exception("bad response: %r" % response.properties)
        return response.body['_arguments']

    def _gather(self, response):
        items = response.body
        while u'partial' in response.properties:
            response = self._client.wait()
            items += self._client.wait(response.correlation_id).body
        return items

    def _classQuery(self, class_name):
        query = {'_what' : 'OBJECT', '_schema_id' : {'_class_name' : class_name}}
        def f():
            response = self._request(u'_query_request', query)
            if response.properties[u'qmf.opcode'] != u'_query_response':
                raise Exception("bad response")
            return self._gather(response)
        return self._retry(f)

    def _nameQuery(self, object_id):
        query = {'_what'      : 'OBJECT', '_object_id' : {'_object_name' : object_id}}
        def f():
            response = self._request(u'_query_request', query)
            if response.properties[u'qmf.opcode'] != u'_query_response':
                raise Exception("bad response")
            items = self._gather(response)
            if len(items) == 1:
                return items[0]
            return None
        return self._retry(f)

    def _getAll(self, cls):
        return [cls(self, x) for x in self._classQuery(cls.__name__.lower())]

    def _getSingle(self, cls):
        l = self._getAll(cls)
        return l and l[0]

    def _get(self, cls, oid):
        x = self._nameQuery(oid)
        return x and cls(self, x)

    def getBroker(self): return self._getSingle(Broker)
    def getCluster(self): return self._getSingle(Cluster)
    def getHaBroker(self): return self._getSingle(HaBroker)
    def getAllConnections(self): return self._getAll(Connection)
    def getConnection(self, oid): return self._get(Connection, "org.apache.qpid.broker:connection:%s" % oid)
    def getAllSessions(self): return self._getAll(Session)
    def getSession(self, oid): return self._get(Session, "org.apache.qpid.broker:session:%s" % oid)
    def getAllSubscriptions(self): return self._getAll(Subscription)
    def getSubscription(self, oid): return self._get(Subscription, "org.apache.qpid.broker:subscription:%s" % oid)
    def getAllExchanges(self): return self._getAll(Exchange)
    def getExchange(self, name): return self._get(Exchange, "org.apache.qpid.broker:exchange:%s" % name)
    def getAllQueues(self): return self._getAll(Queue)
    def getQueue(self, name): return self._get(Queue, "org.apache.qpid.broker:queue:%s" % name)
    def getAllBindings(self): return self._getAll(Binding)
    def getAllLinks(self): return self._getAll(Link)
    def getAcl(self): return self._getSingle(Acl)
    def getMemory(self): return self._getSingle(Memory)

    def echo(self, sequence = 1, body = "Body"):
        """Request a response to test the path to the management broker"""
        return self._method('echo', {'sequence' : sequence, 'body' : body})

    def queueMoveMessages(self, srcQueue, destQueue, qty):
        """Move messages from one queue to another"""
        self._method("queueMoveMessages", {'srcQueue':srcQueue,'destQueue':destQueue,'qty':qty})

    def queueRedirect(self, sourceQueue, targetQueue):
        """Enable/disable delivery redirect for indicated queues"""
        self._method("queueRedirect", {'sourceQueue':sourceQueue,'targetQueue':targetQueue})

    def setLogLevel(self, level):
        """Set the log level"""
        self._method("setLogLevel", {'level':level})

    def getLogLevel(self):
        """Get the log level"""
        return self._method('getLogLevel')

    def setTimestampConfig(self, receive):
        """Set the message timestamping configuration"""
        self._method("setTimestampConfig", {'receive':receive})

    def getTimestampConfig(self):
        """Get the message timestamping configuration"""
        return self._method('getTimestampConfig')

    def setLogHiresTimestamp(self, logHires):
        """Set the high resolution timestamp in logs"""
        self._method("setLogHiresTimestamp", {'logHires':logHires})

    def getLogHiresTimestamp(self):
        """Get the high resolution timestamp in logs"""
        return self._method('getLogHiresTimestamp')

    def addExchange(self, exchange_type, name, options={}, **kwargs):
        properties = {}
        properties['exchange-type'] = exchange_type
        for k,v in options.items():
            properties[k] = v
        for k,v in kwargs.items():
            properties[k] = v
        args = {'type':       'exchange',
                'name':        name,
                'properties':  properties,
                'strict':      True}
        self._method('create', args)

    def delExchange(self, name):
        args = {'type': 'exchange', 'name': name}
        self._method('delete', args)

    def addQueue(self, name, options={}, **kwargs):
        properties = options
        for k,v in kwargs.items():
            properties[k] = v
        args = {'type':       'queue',
                'name':        name,
                'properties':  properties,
                'strict':      True}
        self._method('create', args)

    def delQueue(self, name, if_empty=True, if_unused=True):
        options = {'if_empty':  if_empty,
                   'if_unused': if_unused}
        args = {'type':        'queue', 
                'name':         name,
                'options':      options}
        self._method('delete', args)

    def bind(self, exchange, queue, key="", options={}, **kwargs):
        properties = options
        for k,v in kwargs.items():
            properties[k] = v
        args = {'type':       'binding',
                'name':       "%s/%s/%s" % (exchange, queue, key),
                'properties':  properties,
                'strict':      True}
        self._method('create', args)

    def unbind(self, exchange, queue, key, **kwargs):
        args = {'type':       'binding',
                'name':       "%s/%s/%s" % (exchange, queue, key),
                'strict':      True}
        self._method('delete', args)

    def reloadAclFile(self):
        self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")

    def acl_lookup(self, userName, action, aclObj, aclObjName, propMap):
        args = {'userId':      userName,
                'action':      action,
                'object':      aclObj,
                'objectName':  aclObjName,
                'propertyMap': propMap}
        return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")

    def acl_lookupPublish(self, userName, exchange, key):
        args = {'userId':       userName,
                'exchangeName': exchange,
                'routingKey':   key}
        return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")

    def Redirect(self, sourceQueue, targetQueue):
        args = {'sourceQueue': sourceQueue,
                'targetQueue': targetQueue}
        return self._method('queueRedirect', args, "org.apache.qpid.broker:broker:amqp-broker")

    def create(self, _type, name, properties={}, strict=False):
        """Create an object of the specified type"""
        args = {'type': _type,
                'name': name,
                'properties': properties,
                'strict': strict}
        return self._method('create', args)

    def delete(self, _type, name, options):
        """Delete an object of the specified type"""
        args = {'type': _type,
                'name': name,
                'options': options}
        return self._method('delete', args)

    def list(self, _type):
        """List objects of the specified type"""
        return [i["_values"] for i in self._classQuery(_type.lower())]

    def query(self, _type, oid):
        """Query the current state of an object"""
        return self._get(self, _type, oid)


class BrokerObject(object):
    def __init__(self, broker, content):
        self.broker = broker
        self.content = content

    @property
    def values(self):
        return self.content['_values']

    def __getattr__(self, key):
        return self.values.get(key)

    def getObjectId(self):
        return self.content['_object_id']['_object_name']

    def getAttributes(self):
        return self.values

    def getCreateTime(self):
        return self.content['_create_ts']

    def getDeleteTime(self):
        return self.content['_delete_ts']

    def getUpdateTime(self):
        return self.content['_update_ts']

    def update(self):
        """
        Reload the property values from the agent.
        """
        refreshed = self.broker._get(self.__class__, self.getObjectId())
        if refreshed:
            self.content = refreshed.content
            self.values = self.content['_values']
        else:
            raise Exception("No longer exists on the broker")

    def __repr__(self):
        return "%s(%s)"%(self.__class__.__name__, self.content)

    def __str__(self):
        return self.getObjectId()


class Broker(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

class Cluster(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

class HaBroker(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

class Memory(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

class Connection(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

    def close(self):
        self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address)

class Session(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

class Subscription(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

class Exchange(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

class Binding(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

class Queue(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

    def purge(self, request):
        """Discard all or some messages on a queue"""
        self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name)

    def reroute(self, request, useAltExchange, exchange, filter={}):
        """Remove all or some messages on this queue and route them to an exchange"""
        self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter},
                          "org.apache.qpid.broker:queue:%s" % self.name)

class Link(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

class Acl(BrokerObject):
    def __init__(self, broker, values):
        BrokerObject.__init__(self, broker, values)

