in src/qmf/Agent.cpp [408:503]
void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& msg)
{
const string& cid(msg.getCorrelationId());
Variant::Map::const_iterator aIter;
const Variant::Map& props(msg.getProperties());
uint32_t correlator;
bool final(false);
boost::shared_ptr<SyncContext> context;
aIter = props.find("partial");
if (aIter == props.end())
final = true;
aIter = props.find("qmf.content");
if (aIter == props.end())
return;
string content_type(aIter->second.asString());
if (content_type != "_schema" && content_type != "_schema_id" && content_type != "_data")
return;
try { correlator = boost::lexical_cast<uint32_t>(cid); }
catch(const boost::bad_lexical_cast&) { correlator = 0; }
{
qpid::sys::Mutex::ScopedLock l(lock);
map<uint32_t, boost::shared_ptr<SyncContext> >::iterator iter = contextMap.find(correlator);
if (iter != contextMap.end())
context = iter->second;
}
if (context.get() != 0) {
//
// This response is associated with a synchronous request.
//
qpid::sys::Mutex::ScopedLock cl(context->lock);
if (!context->response.isValid())
context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE));
if (content_type == "_data")
for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
Data data(new DataImpl(lIter->asMap(), this));
ConsoleEventImplAccess::get(context->response).addData(data);
if (data.hasSchema())
learnSchemaId(data.getSchemaId());
}
else if (content_type == "_schema_id")
for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
SchemaId schemaId(new SchemaIdImpl(lIter->asMap()));
ConsoleEventImplAccess::get(context->response).addSchemaId(schemaId);
learnSchemaId(schemaId);
}
else if (content_type == "_schema")
for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
Schema schema(new SchemaImpl(lIter->asMap()));
schemaCache->declareSchema(schema);
}
if (final) {
ConsoleEventImplAccess::get(context->response).setFinal();
ConsoleEventImplAccess::get(context->response).setAgent(this);
context->cond.notify();
}
} else {
//
// This response is associated with an asynchronous request.
//
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE));
eventImpl->setCorrelator(correlator);
eventImpl->setAgent(this);
if (content_type == "_data")
for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
Data data(new DataImpl(lIter->asMap(), this));
eventImpl->addData(data);
if (data.hasSchema())
learnSchemaId(data.getSchemaId());
}
else if (content_type == "_schema_id")
for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
SchemaId schemaId(new SchemaIdImpl(lIter->asMap()));
eventImpl->addSchemaId(schemaId);
learnSchemaId(schemaId);
}
else if (content_type == "_schema")
for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
Schema schema(new SchemaImpl(lIter->asMap()));
schemaCache->declareSchema(schema);
}
if (final)
eventImpl->setFinal();
if (content_type != "_schema")
session.enqueueEvent(eventImpl.release());
}
}