void AgentImpl::handleQueryResponse()

in src/qmf/Agent.cpp [408:503]


void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& msg)
{
    const string& cid(msg.getCorrelationId());
    Variant::Map::const_iterator aIter;
    const Variant::Map& props(msg.getProperties());
    uint32_t correlator;
    bool final(false);
    boost::shared_ptr<SyncContext> context;

    aIter = props.find("partial");
    if (aIter == props.end())
        final = true;

    aIter = props.find("qmf.content");
    if (aIter == props.end())
        return;

    string content_type(aIter->second.asString());
    if (content_type != "_schema" && content_type != "_schema_id" && content_type != "_data")
        return;

    try { correlator = boost::lexical_cast<uint32_t>(cid); }
    catch(const boost::bad_lexical_cast&) { correlator = 0; }

    {
        qpid::sys::Mutex::ScopedLock l(lock);
        map<uint32_t, boost::shared_ptr<SyncContext> >::iterator iter = contextMap.find(correlator);
        if (iter != contextMap.end())
            context = iter->second;
    }

    if (context.get() != 0) {
        //
        // This response is associated with a synchronous request.
        //
        qpid::sys::Mutex::ScopedLock cl(context->lock);
        if (!context->response.isValid())
            context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE));

        if (content_type == "_data")
            for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
                Data data(new DataImpl(lIter->asMap(), this));
                ConsoleEventImplAccess::get(context->response).addData(data);
                if (data.hasSchema())
                    learnSchemaId(data.getSchemaId());
            }
        else if (content_type == "_schema_id")
            for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
                SchemaId schemaId(new SchemaIdImpl(lIter->asMap()));
                ConsoleEventImplAccess::get(context->response).addSchemaId(schemaId);
                learnSchemaId(schemaId);
            }
        else if (content_type == "_schema")
            for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
                Schema schema(new SchemaImpl(lIter->asMap()));
                schemaCache->declareSchema(schema);
            }

        if (final) {
            ConsoleEventImplAccess::get(context->response).setFinal();
            ConsoleEventImplAccess::get(context->response).setAgent(this);
            context->cond.notify();
        }
    } else {
        //
        // This response is associated with an asynchronous request.
        //
        auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE));
        eventImpl->setCorrelator(correlator);
        eventImpl->setAgent(this);

        if (content_type == "_data")
            for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
                Data data(new DataImpl(lIter->asMap(), this));
                eventImpl->addData(data);
                if (data.hasSchema())
                    learnSchemaId(data.getSchemaId());
            }
        else if (content_type == "_schema_id")
            for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
                SchemaId schemaId(new SchemaIdImpl(lIter->asMap()));
                eventImpl->addSchemaId(schemaId);
                learnSchemaId(schemaId);
            }
        else if (content_type == "_schema")
            for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
                Schema schema(new SchemaImpl(lIter->asMap()));
                schemaCache->declareSchema(schema);
            }

        if (final)
            eventImpl->setFinal();
        if (content_type != "_schema")
            session.enqueueEvent(eventImpl.release());
    }
}