mdsdinput::Ack ProtocolHandlerJSON::decodeMsg()

in Diagnostic/mdsd/mdsd/ProtocolHandlerJSON.cc [154:369]


mdsdinput::Ack ProtocolHandlerJSON::decodeMsg(msg_data_t& msg_data, std::string& source, CanonicalEntity& ce)
{
    Trace trace(Trace::EventIngest, "ProtocolHandlerJSON::decodeMsg");

    mdsdinput::Ack ack;
    rapidjson::Document d;
    d.ParseInsitu(&msg_data[0]);

    ack.code = mdsdinput::ACK_DECODE_ERROR;

    // Build/fetch schema
    if (!d.IsArray())
    {
        throw std::runtime_error("Invalid JSON document: Was not an array");
    }
    if (d.Size() != 5)
    {
        std::ostringstream msg;
        msg << "Invalid JSON document: Array size invalid: Expected 5, got " << d.Size();
        throw std::runtime_error(msg.str());
    }

    const rapidjson::Value& jsource = d[0];
    const rapidjson::Value& jmsgId = d[1];
    const rapidjson::Value& jschemaId = d[2];
    const rapidjson::Value& jschema = d[3];
    const rapidjson::Value& jmsgdata = d[4];

    if (!jsource.IsString())
    {
        throw std::runtime_error("Invalid JSON document: source (0) is not a String");
    }

    if (!jmsgId.IsNumber())
    {
        throw std::runtime_error("Invalid JSON document: msgId (1) is not a Number");
    }

    if (!jschemaId.IsNumber())
    {
        throw std::runtime_error("Invalid JSON document: schemaId (2) is not a Number");
    }

    if (!jmsgdata.IsArray())
    {
        throw std::runtime_error("Invalid JSON document: data (4) is not an Array");
    }

    if (!jschema.IsNull() && !jschema.IsArray())
    {
        throw std::runtime_error("Invalid JSON document: schema (3) is not an Array");
    }

    auto schema_id = jschemaId.GetUint64();

    std::shared_ptr<mdsdinput::SchemaDef> schema;
    if (!jschema.IsNull())
    {
        bool hasTimestampIndex = false;
        uint32_t timestampIndex;
        schema = std::make_shared<mdsdinput::SchemaDef>();

        for (rapidjson::Value::ConstValueIterator it = jschema.Begin(); it != jschema.End(); ++it)
        {
            if (it == jschema.Begin() && !it->IsArray())
            {
                // If the first element of the array is not an array, not null, and is an unsigned integer
                // then use it as the timestamp index.
                if (!it->IsNull() && it->IsUint())
                {
                    hasTimestampIndex = true;
                    timestampIndex = static_cast<uint32_t>(it->GetUint64());
                }
            }
            else
            {
                if (!it->IsArray() || it->Size() != 2)
                {
                    throw std::runtime_error("Invalid Schema");
                }
                const rapidjson::Value &name = (*it)[0];
                const rapidjson::Value &ft = (*it)[1];

                if (!name.IsString() || !ft.IsString())
                {
                    throw std::runtime_error("Invalid Schema");
                }
                mdsdinput::FieldDef fd;
                fd.name = name.GetString();
                if (!ToEnum(fd.fieldType, ft.GetString()))
                {
                    throw std::runtime_error("Invalid Schema");
                }
                schema->fields.push_back(fd);
            }
        }

        if (hasTimestampIndex)
        {
            if (timestampIndex < schema->fields.size())
            {
                schema->timestampFieldIdx.set(timestampIndex);
            }
        }

        if (!_schema_cache->AddSchemaWithId(schema, schema_id))
        {
            ack.code = mdsdinput::ACK_DUPLICATE_SCHEMA_ID;
            return ack;
        }
    }
    else
    {
        try
        {
            schema = _schema_cache->GetSchema(schema_id);
        }
        catch(std::out_of_range)
        {
            ack.code = mdsdinput::ACK_UNKNOWN_SCHEMA_ID;
            return ack;
        }
    }

    if (schema->fields.size() != jmsgdata.Size())
    {
        std::ostringstream msg;
        msg << "Invalid message data: Array size invalid: Expected " << schema->fields.size() << ", got " << jmsgdata.Size();
        throw std::runtime_error(msg.str());
    }

    ack.msgId = jmsgId.GetInt64();
    source = std::string(jsource.GetString(), jsource.GetStringLength());

    //
    for (int i = 0; i < (int)schema->fields.size(); ++i)
    {
        mdsdinput::FieldDef fd = schema->fields.at(i);
        const rapidjson::Value& val = jmsgdata[i];
        switch (fd.fieldType)
        {
            case mdsdinput::FT_INVALID:
                ack.code = mdsdinput::ACK_DECODE_ERROR;
                return ack;
            case mdsdinput::FT_BOOL:
                if (!val.IsBool())
                {
                    throw std::runtime_error("Invalid Message data");
                }
                ce.AddColumnIgnoreMetaData(fd.name, new MdsValue(val.GetBool()));
                break;
            case mdsdinput::FT_INT32:
                if (!val.IsInt())
                {
                    throw std::runtime_error("Invalid Message data");
                }
                ce.AddColumnIgnoreMetaData(fd.name, new MdsValue(static_cast<long>(val.GetInt())));
                break;
            case mdsdinput::FT_INT64:
                if (!val.IsInt64())
                {
                    throw std::runtime_error("Invalid Message data");
                }
                // The explicit cast is necessary. Without it, the value will get treated as mt_int32.
                ce.AddColumnIgnoreMetaData(fd.name, new MdsValue(static_cast<long long>(val.GetInt64())));
                break;
            case mdsdinput::FT_DOUBLE:
                if (!val.IsNumber())
                {
                    throw std::runtime_error("Invalid Message data");
                }
                ce.AddColumnIgnoreMetaData(fd.name, new MdsValue(val.GetDouble()));
                break;
            case mdsdinput::FT_TIME:
                if (!val.IsArray() || val.Size() != 2)
                {
                    throw std::runtime_error("Invalid Message data");
                }
                {
                    MdsTime time(val[0].GetUint64(), val[1].GetUint()/1000);
                    ce.AddColumnIgnoreMetaData(fd.name,  new MdsValue(time));
                    if (!schema->timestampFieldIdx.empty() && static_cast<uint32_t>(i) == *(schema->timestampFieldIdx))
                    {
                        ce.SetPreciseTime(time);
                    }
                }
                break;
            case mdsdinput::FT_STRING:
                if (!val.IsString())
                {
                    throw std::runtime_error("Invalid Message data");
                }
                ce.AddColumnIgnoreMetaData(fd.name, new MdsValue(std::string(val.GetString(), val.GetStringLength())));
                break;
            default:
                throw std::runtime_error("Invalid field type in schema");
        }
    }

    SchemaCache::IdType mdsdSchemaId;
    auto it = _id_map.find(schema_id);
    if (it != _id_map.end())
    {
        mdsdSchemaId = it->second;
    } else {
        mdsdSchemaId = schema_id_for_key(_schema_cache->GetSchemaKey(schema_id));
        _id_map.insert(std::make_pair(schema_id, mdsdSchemaId));
        TRACEINFO(trace, "Mapped connection schemaId ("+std::to_string(schema_id)+") to SchemaCache id ("+std::to_string(mdsdSchemaId)+")");
    }

    ce.SetSchemaId(mdsdSchemaId);

    ack.code = mdsdinput::ACK_SUCCESS;

    return ack;
}