const failable start()

in components/queue/queue-listener.cpp [47:101]


const failable<value> start(const list<value>& params) {
    // Extract the relay reference and the AMQP key and queue name
    const value rel = car(params);
    const value pk = ((lvvlambda)cadr(params))(nilListValue);
    const value key = isList(pk)? (list<value>)pk : mklist<value>(pk);
    const value qname = ((lvvlambda)caddr(params))(nilListValue);

    // Create an AMQP session
    QpidConnection qc(false);
    QpidSession qs(qc, false);

    // Declare the configured AMQP key / queue pair
    declareQueue(key, qname, qs);

    // Listen and relay messages in a worker thread
    QpidSubscription qsub(qs, false);
    const worker w(3);
    const lambda<const bool(const value&, const value&)> rl = [rel](const value& k, const value& v) -> const bool {
        // A relay function that posts the AMQP messages it receives to a relay component reference.
        debug(k, "queue::relay::key");
        debug(v, "queue::relay::value");
        const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v));
        return true;
    };

    // Subscribe and listen to the AMQP queue.
    const lambda<const failable<bool>()> subscribe = [qname, rl, qsub]() -> const failable<bool> {
        const gc_pool pool;
        debug(qname, "queue::subscribe::listen");
        const failable<bool> r =  listen(qname, rl, const_cast<QpidSubscription&>(qsub));
        debug(qname, "queue::subscribe::stopped");
        return r;
    };
    submit<failable<bool> >(w, subscribe);

    // Return the listener component lambda function
    const lvvlambda listener = [qc, qs, qsub, w](const list<value>& params) -> const value {
        const tuscany::value func(car(params));

        // Stop the component
        if (func != "stop")
            return mkfailure<value>();
        debug("queue::listener::stop");

        // TODO check why stop() and close() hang in child processes
        stop(const_cast<QpidSubscription&>(qsub));
        close(const_cast<QpidSession&>(qs));
        close(const_cast<QpidConnection&>(qc));
        cancel(const_cast<worker&>(w));

        debug("queue::listener::stopped");
        return failable<value>(value(lvvlambda()));
    };
    return value(listener);
}