in components/queue/queue-listener.cpp [47:101]
const failable<value> start(const list<value>& params) {
// Extract the relay reference and the AMQP key and queue name
const value rel = car(params);
const value pk = ((lvvlambda)cadr(params))(nilListValue);
const value key = isList(pk)? (list<value>)pk : mklist<value>(pk);
const value qname = ((lvvlambda)caddr(params))(nilListValue);
// Create an AMQP session
QpidConnection qc(false);
QpidSession qs(qc, false);
// Declare the configured AMQP key / queue pair
declareQueue(key, qname, qs);
// Listen and relay messages in a worker thread
QpidSubscription qsub(qs, false);
const worker w(3);
const lambda<const bool(const value&, const value&)> rl = [rel](const value& k, const value& v) -> const bool {
// A relay function that posts the AMQP messages it receives to a relay component reference.
debug(k, "queue::relay::key");
debug(v, "queue::relay::value");
const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v));
return true;
};
// Subscribe and listen to the AMQP queue.
const lambda<const failable<bool>()> subscribe = [qname, rl, qsub]() -> const failable<bool> {
const gc_pool pool;
debug(qname, "queue::subscribe::listen");
const failable<bool> r = listen(qname, rl, const_cast<QpidSubscription&>(qsub));
debug(qname, "queue::subscribe::stopped");
return r;
};
submit<failable<bool> >(w, subscribe);
// Return the listener component lambda function
const lvvlambda listener = [qc, qs, qsub, w](const list<value>& params) -> const value {
const tuscany::value func(car(params));
// Stop the component
if (func != "stop")
return mkfailure<value>();
debug("queue::listener::stop");
// TODO check why stop() and close() hang in child processes
stop(const_cast<QpidSubscription&>(qsub));
close(const_cast<QpidSession&>(qs));
close(const_cast<QpidConnection&>(qc));
cancel(const_cast<worker&>(w));
debug("queue::listener::stopped");
return failable<value>(value(lvvlambda()));
};
return value(listener);
}