lib/api/unittest/CJsonOutputWriterTest.cc (1,698 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <core/CContainerPrinter.h> #include <core/CJsonOutputStreamWrapper.h> #include <core/COsFileFuncs.h> #include <core/CScopedBoostJsonPoolAllocator.h> #include <core/CSmallVector.h> #include <core/CTimeUtils.h> #include <model/CAnomalyDetector.h> #include <model/CAnomalyDetectorModelConfig.h> #include <model/CHierarchicalResultsNormalizer.h> #include <model/CLimits.h> #include <api/CGlobalCategoryId.h> #include <api/CJsonOutputWriter.h> #include <test/BoostTestCloseAbsolute.h> #include <boost/test/unit_test.hpp> #include <fstream> #include <sstream> #include <string> BOOST_AUTO_TEST_SUITE(CJsonOutputWriterTest) namespace { using TDouble1Vec = ml::core::CSmallVector<double, 1>; using TStr1Vec = ml::core::CSmallVector<std::string, 1>; const TStr1Vec EMPTY_STRING_LIST; void testBucketWriteHelper(bool isInterim) { // groups output by bucket/detector std::ostringstream sstream; // The output writer won't close the JSON structures until is is destroyed { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); std::string partitionFieldName("tfn"); std::string partitionFieldValue(""); std::string overFieldName("pfn"); std::string overFieldValue("pfv"); std::string byFieldName("airline"); std::string byFieldValue("GAL"); std::string correlatedByFieldValue("BAW"); std::string fieldName("responsetime"); std::string function("mean"); std::string functionDescription("mean(responsetime)"); std::string emptyString; ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec influences; { ml::api::CHierarchicalResultsWriter::SResults result11( false, false, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 2.24, 0.5, 0.0, 79, fieldName, influences, false, false, 1, 100); ml::api::CHierarchicalResultsWriter::SResults result112( false, true, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 2.24, 0.5, 0.0, 79, fieldName, influences, false, false, 1, 100); ml::api::CHierarchicalResultsWriter::SResults result12( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.8, 0.0, -5.0, fieldName, influences, false, true, 2, 100, EMPTY_STRING_LIST, {}); ml::api::CHierarchicalResultsWriter::SResults result13( ml::api::CHierarchicalResultsWriter::E_SimpleCountResult, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.5, 0.0, -5.0, fieldName, influences, false, false, 3, 100, EMPTY_STRING_LIST, {}); ml::api::CHierarchicalResultsWriter::SResults result14( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.0, 0.0, -5.0, fieldName, influences, false, false, 4, 100, EMPTY_STRING_LIST, {}); // 1st bucket BOOST_TEST_REQUIRE(writer.acceptResult(result11)); BOOST_TEST_REQUIRE(writer.acceptResult(result11)); BOOST_TEST_REQUIRE(writer.acceptResult(result112)); BOOST_TEST_REQUIRE(writer.acceptResult(result12)); BOOST_TEST_REQUIRE(writer.acceptResult(result12)); BOOST_TEST_REQUIRE(writer.acceptResult(result13)); BOOST_TEST_REQUIRE(writer.acceptResult(result13)); BOOST_TEST_REQUIRE(writer.acceptResult(result14)); BOOST_TEST_REQUIRE(writer.acceptResult(result14)); writer.acceptBucketTimeInfluencer(1, 0.01, 13.44, 70.0); } { ml::api::CHierarchicalResultsWriter::SResults result21( false, false, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 2, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 2.24, 0.6, 0.0, 79, fieldName, influences, false, false, 1, 100); ml::api::CHierarchicalResultsWriter::SResults result212( false, true, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 2, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 2.24, 0.6, 0.0, 79, fieldName, influences, false, false, 1, 100); ml::api::CHierarchicalResultsWriter::SResults result22( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 2, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.8, 0.0, -5.0, fieldName, influences, false, true, 2, 100, EMPTY_STRING_LIST, {}); ml::api::CHierarchicalResultsWriter::SResults result23( ml::api::CHierarchicalResultsWriter::E_SimpleCountResult, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 2, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.0, 0.0, -5.0, fieldName, influences, false, false, 3, 100, EMPTY_STRING_LIST, {}); ml::api::CHierarchicalResultsWriter::SResults result24( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 2, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.0, 0.0, -5.0, fieldName, influences, false, false, 4, 100, EMPTY_STRING_LIST, {}); // 2nd bucket BOOST_TEST_REQUIRE(writer.acceptResult(result21)); BOOST_TEST_REQUIRE(writer.acceptResult(result21)); BOOST_TEST_REQUIRE(writer.acceptResult(result212)); BOOST_TEST_REQUIRE(writer.acceptResult(result22)); BOOST_TEST_REQUIRE(writer.acceptResult(result22)); BOOST_TEST_REQUIRE(writer.acceptResult(result23)); BOOST_TEST_REQUIRE(writer.acceptResult(result23)); BOOST_TEST_REQUIRE(writer.acceptResult(result24)); BOOST_TEST_REQUIRE(writer.acceptResult(result24)); writer.acceptBucketTimeInfluencer(2, 0.01, 13.44, 70.0); } { ml::api::CHierarchicalResultsWriter::SResults result31( false, false, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 3, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 2.24, 0.8, 0.0, 79, fieldName, influences, false, false, 1, 100); ml::api::CHierarchicalResultsWriter::SResults result312( false, true, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 3, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 2.24, 0.8, 0.0, 79, fieldName, influences, false, false, 1, 100); ml::api::CHierarchicalResultsWriter::SResults result32( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 3, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.0, 0.0, -5.0, fieldName, influences, false, true, 2, 100, EMPTY_STRING_LIST, {}); ml::api::CHierarchicalResultsWriter::SResults result33( ml::api::CHierarchicalResultsWriter::E_SimpleCountResult, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 3, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.0, 0.0, -5.0, fieldName, influences, false, false, 3, 100, EMPTY_STRING_LIST, {}); ml::api::CHierarchicalResultsWriter::SResults result34( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 3, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.0, 0.0, -5.0, fieldName, influences, false, false, 4, 100, EMPTY_STRING_LIST, {}); // 3rd bucket BOOST_TEST_REQUIRE(writer.acceptResult(result31)); BOOST_TEST_REQUIRE(writer.acceptResult(result31)); BOOST_TEST_REQUIRE(writer.acceptResult(result312)); BOOST_TEST_REQUIRE(writer.acceptResult(result32)); BOOST_TEST_REQUIRE(writer.acceptResult(result32)); BOOST_TEST_REQUIRE(writer.acceptResult(result33)); BOOST_TEST_REQUIRE(writer.acceptResult(result33)); BOOST_TEST_REQUIRE(writer.acceptResult(result34)); BOOST_TEST_REQUIRE(writer.acceptResult(result34)); writer.acceptBucketTimeInfluencer(3, 0.01, 13.44, 70.0); } // Finished adding results BOOST_TEST_REQUIRE(writer.endOutputBatch(isInterim, 10U)); } json::error_code ec; json::value arrayDoc = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(arrayDoc.is_array()); LOG_DEBUG(<< "Results:\n" << arrayDoc); // There are 3 buckets and 3 record arrays in the order: r1, b1, r2, b2, r3, b3 BOOST_REQUIRE_EQUAL(6, arrayDoc.as_array().size()); int bucketTimes[] = {1000, 1000, 2000, 2000, 3000, 3000}; // Assert buckets for (std::size_t i = 1; i < arrayDoc.as_array().size(); i = i + 2) { int buckettime = bucketTimes[i]; const json::value& bucketWrapper_ = arrayDoc.as_array().at(i); const json::object& bucketWrapper = bucketWrapper_.as_object(); BOOST_TEST_REQUIRE(bucketWrapper.contains("bucket")); const json::value& bucket_ = bucketWrapper.at("bucket"); BOOST_TEST_REQUIRE(bucket_.is_object()); const json::object& bucket = bucket_.as_object(); BOOST_TEST_REQUIRE(bucket.contains("job_id")); BOOST_REQUIRE_EQUAL("job", bucket.at("job_id").as_string()); // 3 detectors each have 2 records (simple count detector isn't added) // except the population detector which has a single record and clauses BOOST_REQUIRE_EQUAL(buckettime, bucket.at("timestamp").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(bucket.contains("bucket_influencers")); const json::value& bucketInfluencers_ = bucket.at("bucket_influencers"); BOOST_TEST_REQUIRE(bucketInfluencers_.is_array()); const json::array& bucketInfluencers = bucketInfluencers_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(1), bucketInfluencers.size()); const json::value& bucketInfluencer_ = bucketInfluencers[std::size_t(0)]; const json::object& bucketInfluencer = bucketInfluencer_.as_object(); BOOST_REQUIRE_CLOSE_ABSOLUTE( 13.44, bucketInfluencer.at("raw_anomaly_score").to_number<double>(), 0.00001); BOOST_REQUIRE_CLOSE_ABSOLUTE( 0.01, bucketInfluencer.at("probability").to_number<double>(), 0.00001); BOOST_REQUIRE_CLOSE_ABSOLUTE( 70.0, bucketInfluencer.at("initial_anomaly_score").to_number<double>(), 0.00001); BOOST_TEST_REQUIRE(bucketInfluencer.contains("anomaly_score")); BOOST_REQUIRE_CLOSE_ABSOLUTE( 70.0, bucketInfluencer.at("anomaly_score").to_number<double>(), 0.00001); BOOST_REQUIRE_EQUAL("bucket_time", bucketInfluencer.at("influencer_field_name").as_string()); BOOST_REQUIRE_EQUAL(79, bucket.at("event_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(bucket.contains("anomaly_score")); BOOST_REQUIRE_CLOSE_ABSOLUTE( 70.0, bucket.at("anomaly_score").to_number<double>(), 0.00001); BOOST_TEST_REQUIRE(bucket.contains("initial_anomaly_score")); BOOST_REQUIRE_CLOSE_ABSOLUTE( 70.0, bucket.at("initial_anomaly_score").to_number<double>(), 0.00001); if (isInterim) { BOOST_TEST_REQUIRE(bucket.contains("is_interim")); BOOST_REQUIRE_EQUAL(isInterim, bucket.at("is_interim").as_bool()); } else { BOOST_TEST_REQUIRE(!bucket.contains("is_interim")); } BOOST_REQUIRE_EQUAL(std::uint64_t(10ll), bucket.at("processing_time_ms").to_number<std::uint64_t>()); } for (std::size_t i = 0; i < arrayDoc.as_array().size(); i = i + 2) { int buckettime = bucketTimes[i]; const json::value& recordsWrapper_ = arrayDoc.as_array().at(i); const json::object& recordsWrapper = recordsWrapper_.as_object(); BOOST_TEST_REQUIRE(recordsWrapper.contains("records")); const json::value& records_ = recordsWrapper.at("records"); BOOST_TEST_REQUIRE(records_.is_array()); const json::array& records = records_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(5), records.size()); // 1st record is for population detector { const json::value& record_ = records[std::size_t(0)]; const json::object& record = record_.as_object(); BOOST_TEST_REQUIRE(record.contains("job_id")); BOOST_REQUIRE_EQUAL("job", record.at("job_id").as_string()); BOOST_TEST_REQUIRE(record.contains("detector_index")); BOOST_REQUIRE_EQUAL(1, record.at("detector_index").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(record.contains("timestamp")); BOOST_REQUIRE_EQUAL(buckettime, record.at("timestamp").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(record.contains("probability")); BOOST_REQUIRE_EQUAL(0.0, record.at("probability").to_number<double>()); BOOST_TEST_REQUIRE(record.contains("by_field_name")); BOOST_REQUIRE_EQUAL("airline", record.at("by_field_name").as_string()); BOOST_TEST_REQUIRE(!record.contains("by_field_value")); BOOST_TEST_REQUIRE(!record.contains("correlated_by_field_value")); BOOST_TEST_REQUIRE(record.contains("function")); BOOST_REQUIRE_EQUAL("mean", record.at("function").as_string()); BOOST_TEST_REQUIRE(record.contains("function_description")); BOOST_REQUIRE_EQUAL("mean(responsetime)", record.at("function_description").as_string()); BOOST_TEST_REQUIRE(record.contains("over_field_name")); BOOST_REQUIRE_EQUAL("pfn", record.at("over_field_name").as_string()); BOOST_TEST_REQUIRE(record.contains("over_field_value")); BOOST_REQUIRE_EQUAL("pfv", record.at("over_field_value").as_string()); BOOST_TEST_REQUIRE(record.contains("bucket_span")); BOOST_REQUIRE_EQUAL(100, record.at("bucket_span").to_number<std::int64_t>()); // It's hard to predict what these will be, so just assert their // presence BOOST_TEST_REQUIRE(record.contains("initial_record_score")); BOOST_TEST_REQUIRE(record.contains("record_score")); if (isInterim) { BOOST_TEST_REQUIRE(record.contains("is_interim")); BOOST_REQUIRE_EQUAL(isInterim, record.at("is_interim").as_bool()); } else { BOOST_TEST_REQUIRE(!record.contains("is_interim")); } BOOST_TEST_REQUIRE(record.contains("causes")); const json::value& causes_ = record.at("causes"); BOOST_TEST_REQUIRE(causes_.is_array()); const json::array& causes = causes_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(2), causes.size()); for (std::size_t k = 0; k < causes.size(); k++) { const json::value& cause_ = causes[k]; const json::object& cause = cause_.as_object(); BOOST_TEST_REQUIRE(cause.contains("probability")); BOOST_REQUIRE_EQUAL(0.0, cause.at("probability").to_number<double>()); BOOST_TEST_REQUIRE(cause.contains("field_name")); BOOST_REQUIRE_EQUAL("responsetime", cause.at("field_name").as_string()); BOOST_TEST_REQUIRE(cause.contains("by_field_name")); BOOST_REQUIRE_EQUAL("airline", cause.at("by_field_name").as_string()); BOOST_TEST_REQUIRE(cause.contains("by_field_value")); BOOST_REQUIRE_EQUAL("GAL", cause.at("by_field_value").as_string()); BOOST_TEST_REQUIRE(cause.contains("correlated_by_field_value")); BOOST_REQUIRE_EQUAL("BAW", cause.at("correlated_by_field_value").as_string()); BOOST_TEST_REQUIRE(cause.contains("partition_field_name")); BOOST_REQUIRE_EQUAL("tfn", cause.at("partition_field_name").as_string()); BOOST_TEST_REQUIRE(cause.contains("partition_field_value")); BOOST_REQUIRE_EQUAL("", cause.at("partition_field_value").as_string()); BOOST_TEST_REQUIRE(cause.contains("function")); BOOST_REQUIRE_EQUAL("mean", cause.at("function").as_string()); BOOST_TEST_REQUIRE(cause.contains("function_description")); BOOST_REQUIRE_EQUAL("mean(responsetime)", cause.at("function_description").as_string()); BOOST_TEST_REQUIRE(cause.contains("typical")); BOOST_TEST_REQUIRE(cause.at("typical").is_array()); BOOST_REQUIRE_EQUAL(std::size_t(1), cause.at("typical").as_array().size()); BOOST_REQUIRE_EQUAL( 6953.0, cause.at("typical").as_array().at(std::size_t(0)).to_number<double>()); BOOST_TEST_REQUIRE(cause.contains("actual")); BOOST_TEST_REQUIRE(cause.at("actual").is_array()); BOOST_REQUIRE_EQUAL(std::size_t(1), cause.at("actual").as_array().size()); BOOST_REQUIRE_EQUAL( 10090.0, cause.at("actual").as_array().at(std::size_t(0)).to_number<double>()); BOOST_TEST_REQUIRE(cause.contains("function")); } } // Next 2 records are for metric detector { for (std::size_t k = 1; k < 3; k++) { const json::value& record_ = records[k]; const json::object& record = record_.as_object(); BOOST_TEST_REQUIRE(record.contains("job_id")); BOOST_REQUIRE_EQUAL("job", record.at("job_id").as_string()); BOOST_TEST_REQUIRE(record.contains("detector_index")); BOOST_REQUIRE_EQUAL( 2, record.at("detector_index").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(record.contains("timestamp")); BOOST_REQUIRE_EQUAL( buckettime, record.at("timestamp").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(record.contains("probability")); BOOST_REQUIRE_EQUAL(0.0, record.at("probability").to_number<double>()); BOOST_TEST_REQUIRE(record.contains("by_field_name")); BOOST_REQUIRE_EQUAL("airline", record.at("by_field_name").as_string()); BOOST_TEST_REQUIRE(record.contains("by_field_value")); BOOST_REQUIRE_EQUAL("GAL", record.at("by_field_value").as_string()); BOOST_TEST_REQUIRE(record.contains("correlated_by_field_value")); BOOST_REQUIRE_EQUAL( "BAW", record.at("correlated_by_field_value").as_string()); BOOST_TEST_REQUIRE(record.contains("typical")); BOOST_TEST_REQUIRE(record.at("typical").is_array()); BOOST_REQUIRE_EQUAL(std::size_t(1), record.at("typical").as_array().size()); BOOST_REQUIRE_EQUAL( 6953.0, record.at("typical").as_array().at(std::size_t(0)).to_number<double>()); BOOST_TEST_REQUIRE(record.contains("actual")); BOOST_TEST_REQUIRE(record.at("actual").is_array()); BOOST_REQUIRE_EQUAL(std::size_t(1), record.at("actual").as_array().size()); BOOST_REQUIRE_EQUAL( 10090.0, record.at("actual").as_array().at(std::size_t(0)).to_number<double>()); BOOST_TEST_REQUIRE(record.contains("field_name")); BOOST_REQUIRE_EQUAL("responsetime", record.at("field_name").as_string()); BOOST_TEST_REQUIRE(record.contains("function")); BOOST_REQUIRE_EQUAL("mean", record.at("function").as_string()); BOOST_TEST_REQUIRE(record.contains("function_description")); BOOST_REQUIRE_EQUAL("mean(responsetime)", record.at("function_description").as_string()); BOOST_TEST_REQUIRE(record.contains("partition_field_name")); BOOST_REQUIRE_EQUAL("tfn", record.at("partition_field_name").as_string()); BOOST_TEST_REQUIRE(record.contains("partition_field_value")); BOOST_REQUIRE_EQUAL("", record.at("partition_field_value").as_string()); BOOST_TEST_REQUIRE(record.contains("bucket_span")); BOOST_REQUIRE_EQUAL(100, record.at("bucket_span").to_number<std::int64_t>()); // It's hard to predict what these will be, so just assert their // presence BOOST_TEST_REQUIRE(record.contains("initial_record_score")); BOOST_TEST_REQUIRE(record.contains("record_score")); if (isInterim) { BOOST_TEST_REQUIRE(record.contains("is_interim")); BOOST_REQUIRE_EQUAL(isInterim, record.at("is_interim").as_bool()); } else { BOOST_TEST_REQUIRE(!record.contains("is_interim")); } } } // Last 2 records are for event rate detector { for (std::size_t k = 3; k < 5; k++) { const json::value& record_ = records[k]; const json::object& record = record_.as_object(); BOOST_TEST_REQUIRE(record.contains("job_id")); BOOST_REQUIRE_EQUAL("job", record.at("job_id").as_string()); BOOST_TEST_REQUIRE(record.contains("detector_index")); BOOST_REQUIRE_EQUAL( 4, record.at("detector_index").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(record.contains("timestamp")); BOOST_REQUIRE_EQUAL( buckettime, record.at("timestamp").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(record.contains("probability")); BOOST_REQUIRE_EQUAL(0.0, record.at("probability").to_number<double>()); BOOST_TEST_REQUIRE(record.contains("by_field_name")); BOOST_REQUIRE_EQUAL("airline", record.at("by_field_name").as_string()); BOOST_TEST_REQUIRE(record.contains("by_field_value")); BOOST_REQUIRE_EQUAL("GAL", record.at("by_field_value").as_string()); BOOST_TEST_REQUIRE(record.contains("correlated_by_field_value")); BOOST_REQUIRE_EQUAL( "BAW", record.at("correlated_by_field_value").as_string()); BOOST_TEST_REQUIRE(record.contains("typical")); BOOST_TEST_REQUIRE(record.at("typical").is_array()); BOOST_REQUIRE_EQUAL(std::size_t(1), record.at("typical").as_array().size()); BOOST_REQUIRE_EQUAL( 6953.0, record.at("typical").as_array().at(std::size_t(0)).to_number<double>()); BOOST_TEST_REQUIRE(record.contains("actual")); BOOST_TEST_REQUIRE(record.at("actual").is_array()); BOOST_REQUIRE_EQUAL(std::size_t(1), record.at("actual").as_array().size()); BOOST_REQUIRE_EQUAL( 10090.0, record.at("actual").as_array().at(std::size_t(0)).to_number<double>()); BOOST_TEST_REQUIRE(record.contains("function")); // This would be count in the real case with properly generated input data BOOST_REQUIRE_EQUAL("mean", record.at("function").as_string()); BOOST_TEST_REQUIRE(record.contains("function_description")); BOOST_REQUIRE_EQUAL("mean(responsetime)", record.at("function_description").as_string()); BOOST_TEST_REQUIRE(record.contains("partition_field_name")); BOOST_REQUIRE_EQUAL("tfn", record.at("partition_field_name").as_string()); BOOST_TEST_REQUIRE(record.contains("partition_field_value")); BOOST_REQUIRE_EQUAL("", record.at("partition_field_value").as_string()); BOOST_TEST_REQUIRE(record.contains("bucket_span")); BOOST_REQUIRE_EQUAL(100, record.at("bucket_span").to_number<std::int64_t>()); // It's hard to predict what these will be, so just assert their // presence BOOST_TEST_REQUIRE(record.contains("initial_record_score")); BOOST_TEST_REQUIRE(record.contains("record_score")); if (isInterim) { BOOST_TEST_REQUIRE(record.contains("is_interim")); BOOST_REQUIRE_EQUAL(isInterim, record.at("is_interim").as_bool()); } else { BOOST_TEST_REQUIRE(!record.contains("is_interim")); } } } } } void testLimitedRecordsWriteHelper(bool isInterim) { // Tests CJsonOutputWriter::limitNumberRecords(size_t) // set the record limit for each detector to 2 std::ostringstream sstream; // The output writer won't close the JSON structures until it is destroyed { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); writer.limitNumberRecords(2); std::string partitionFieldName("tfn"); std::string partitionFieldValue("tfv"); std::string overFieldName("pfn"); std::string overFieldValue("pfv"); std::string byFieldName("airline"); std::string byFieldValue("GAL"); std::string fieldName("responsetime"); std::string function("mean"); std::string functionDescription("mean(responsetime)"); std::string emptyString; ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec influences; { // 1st bucket ml::api::CHierarchicalResultsWriter::SResults result111( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, emptyString, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 0.0, 0.1, 0.1, -5.0, fieldName, influences, false, true, 1, 100, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result111)); ml::api::CHierarchicalResultsWriter::SResults result112( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, emptyString, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 0.0, 0.1, 0.2, -5.0, fieldName, influences, false, true, 1, 100, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result112)); ml::api::CHierarchicalResultsWriter::SResults result113( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, emptyString, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.0, 0.0, 0.4, -5.0, fieldName, influences, false, true, 1, 100, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result113)); ml::api::CHierarchicalResultsWriter::SResults result114( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, emptyString, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 12.0, 0.0, 0.4, -5.0, fieldName, influences, false, true, 1, 100, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result114)); BOOST_TEST_REQUIRE(writer.acceptResult(result114)); overFieldName = "ofn"; overFieldValue = "ofv"; ml::api::CHierarchicalResultsWriter::SResults result121( false, false, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, emptyString, emptyString, emptyString, 1, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 12.0, 0.0, 0.01, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result121)); ml::api::CHierarchicalResultsWriter::SResults result122( false, true, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, emptyString, 1, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 12.0, 0.0, 0.01, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result122)); ml::api::CHierarchicalResultsWriter::SResults result123( false, false, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, emptyString, 1, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 0.5, 0.0, 0.5, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result123)); ml::api::CHierarchicalResultsWriter::SResults result124( false, true, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, emptyString, emptyString, emptyString, 1, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 0.5, 0.0, 0.5, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result124)); ml::api::CHierarchicalResultsWriter::SResults result125( false, false, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, emptyString, 1, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 6.0, 0.0, 0.5, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result125)); ml::api::CHierarchicalResultsWriter::SResults result126( false, true, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, emptyString, emptyString, emptyString, 1, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 6.0, 0.0, 0.05, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result126)); } { // 2nd bucket overFieldName.clear(); overFieldValue.clear(); ml::api::CHierarchicalResultsWriter::SResults result211( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, emptyString, 2, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 1.0, 0.0, 0.05, -5.0, fieldName, influences, false, true, 1, 100, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result211)); ml::api::CHierarchicalResultsWriter::SResults result212( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, emptyString, 2, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 7.0, 0.0, 0.001, -5.0, fieldName, influences, false, true, 1, 100, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result212)); ml::api::CHierarchicalResultsWriter::SResults result213( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, emptyString, 2, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 0.6, 0.0, 0.1, -5.0, fieldName, influences, false, true, 1, 100, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result213)); BOOST_TEST_REQUIRE(writer.acceptResult(result213)); overFieldName = "ofn"; overFieldValue = "ofv"; ml::api::CHierarchicalResultsWriter::SResults result221( false, false, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, emptyString, 2, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 0.6, 0.0, 0.1, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result221)); BOOST_TEST_REQUIRE(writer.acceptResult(result221)); ml::api::CHierarchicalResultsWriter::SResults result222( false, false, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, emptyString, emptyString, emptyString, 2, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 0.6, 0.0, 0.1, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result222)); ml::api::CHierarchicalResultsWriter::SResults result223( false, false, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, emptyString, 2, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 3.0, 0.0, 0.02, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result223)); ml::api::CHierarchicalResultsWriter::SResults result224( false, true, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, emptyString, emptyString, emptyString, 2, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 20.0, 0.0, 0.02, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result224)); } { // 3rd bucket overFieldName.clear(); overFieldValue.clear(); ml::api::CHierarchicalResultsWriter::SResults result311( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, emptyString, 3, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 30.0, 0.0, 0.02, -5.0, fieldName, influences, false, true, 1, 100, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result311)); overFieldName = "ofn"; overFieldValue = "ofv"; ml::api::CHierarchicalResultsWriter::SResults result321( false, false, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, emptyString, 3, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 31.0, 0.0, 0.0002, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result321)); ml::api::CHierarchicalResultsWriter::SResults result322( false, true, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, emptyString, emptyString, emptyString, 3, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 31.0, 0.0, 0.0002, 79, fieldName, influences, false, true, 2, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result322)); } // Finished adding results BOOST_TEST_REQUIRE(writer.endOutputBatch(isInterim, 10U)); } json::error_code ec; json::value arrayDoc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); LOG_DEBUG(<< "Results:\n" << arrayDoc_); BOOST_TEST_REQUIRE(arrayDoc_.is_array()); const json::array& arrayDoc = arrayDoc_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(6), arrayDoc.size()); // buckets and records are the top level objects // records corresponding to a bucket appear first. The bucket follows. // each bucket has max 2 records from either both or // one or the other of the 2 detectors used. // records are sorted by probability. // bucket total anomaly score is the sum of all anomalies not just those printed. { const json::value& bucketWrapper_ = arrayDoc.at(std::size_t(1)); BOOST_TEST_REQUIRE(bucketWrapper_.is_object()); const json::object& bucketWrapper = bucketWrapper_.as_object(); BOOST_TEST_REQUIRE(bucketWrapper.contains("bucket")); const json::value& bucket_ = bucketWrapper.at("bucket"); BOOST_TEST_REQUIRE(bucket_.is_object()); const json::object& bucket = bucket_.as_object(); // It's hard to predict what these will be, so just assert their presence BOOST_TEST_REQUIRE(bucket.contains("anomaly_score")); if (isInterim) { BOOST_TEST_REQUIRE(bucket.contains("is_interim")); BOOST_REQUIRE_EQUAL(isInterim, bucket.at("is_interim").as_bool()); } else { BOOST_TEST_REQUIRE(!bucket.contains("is_interim")); } const json::value& recordsWrapper_ = arrayDoc.at(std::size_t(0)); BOOST_TEST_REQUIRE(recordsWrapper_.is_object()); const json::object& recordsWrapper = recordsWrapper_.as_object(); BOOST_TEST_REQUIRE(recordsWrapper.contains("records")); const json::value& records_ = recordsWrapper.at("records"); BOOST_TEST_REQUIRE(records_.is_array()); const json::array& records = records_.as_array(); double EXPECTED_PROBABILITIES[] = {0.01, 0.05, 0.001, 0.02, 0.0002}; int probIndex = 0; for (std::size_t i = 0; i < records.size(); i++) { BOOST_TEST_REQUIRE(records.at(i).as_object().contains("detector_index")); BOOST_TEST_REQUIRE(records.at(i).as_object().contains("initial_record_score")); BOOST_TEST_REQUIRE(records.at(i).as_object().contains("record_score")); BOOST_TEST_REQUIRE(records.at(i).as_object().contains("probability")); BOOST_REQUIRE_EQUAL( EXPECTED_PROBABILITIES[probIndex], records.at(i).as_object().at("probability").to_number<double>()); ++probIndex; if (isInterim) { BOOST_TEST_REQUIRE(records.at(i).as_object().contains("is_interim")); BOOST_REQUIRE_EQUAL( isInterim, records.at(i).as_object().at("is_interim").as_bool()); } else { BOOST_TEST_REQUIRE(!records.at(i).as_object().contains("is_interim")); } } BOOST_REQUIRE_EQUAL(std::size_t(2), records.size()); } { const json::value& bucketWrapper_ = arrayDoc.at(std::size_t(3)); BOOST_TEST_REQUIRE(bucketWrapper_.is_object()); const json::object& bucketWrapper = bucketWrapper_.as_object(); BOOST_TEST_REQUIRE(bucketWrapper.contains("bucket")); const json::value& bucket_ = bucketWrapper.at("bucket"); BOOST_TEST_REQUIRE(bucket_.is_object()); const json::object& bucket = bucket_.as_object(); // It's hard to predict what these will be, so just assert their presence BOOST_TEST_REQUIRE(bucket.contains("anomaly_score")); if (isInterim) { BOOST_TEST_REQUIRE(bucket.contains("is_interim")); BOOST_REQUIRE_EQUAL(isInterim, bucket.at("is_interim").as_bool()); } else { BOOST_TEST_REQUIRE(!bucket.contains("is_interim")); } const json::value& recordsWrapper_ = arrayDoc.at(std::size_t(2)); BOOST_TEST_REQUIRE(recordsWrapper_.is_object()); const json::object& recordsWrapper = recordsWrapper_.as_object(); BOOST_TEST_REQUIRE(recordsWrapper.contains("records")); const json::value& records_ = recordsWrapper.at("records"); BOOST_TEST_REQUIRE(records_.is_array()); const json::array& records = records_.as_array(); for (std::size_t i = 0; i < records.size(); i++) { //BOOST_REQUIRE_EQUAL(0.1, records1[std::size_t(0)]["probability").to_number<double>()); BOOST_TEST_REQUIRE(records.at(i).as_object().contains("detector_index")); BOOST_TEST_REQUIRE(records.at(i).as_object().contains("initial_record_score")); BOOST_TEST_REQUIRE(records.at(i).as_object().contains("record_score")); if (isInterim) { BOOST_TEST_REQUIRE(records.at(i).as_object().contains("is_interim")); BOOST_REQUIRE_EQUAL( isInterim, records.at(i).as_object().at("is_interim").as_bool()); } else { BOOST_TEST_REQUIRE(!records.at(i).as_object().contains("is_interim")); } } BOOST_REQUIRE_EQUAL(std::size_t(2), records.size()); } { const json::value& bucketWrapper_ = arrayDoc.at(std::size_t(5)); BOOST_TEST_REQUIRE(bucketWrapper_.is_object()); const json::object& bucketWrapper = bucketWrapper_.as_object(); BOOST_TEST_REQUIRE(bucketWrapper.contains("bucket")); const json::value& bucket_ = bucketWrapper.at("bucket"); BOOST_TEST_REQUIRE(bucket_.is_object()); const json::object& bucket = bucket_.as_object(); // It's hard to predict what these will be, so just assert their presence BOOST_TEST_REQUIRE(bucket.contains("anomaly_score")); if (isInterim) { BOOST_TEST_REQUIRE(bucket.contains("is_interim")); BOOST_REQUIRE_EQUAL(isInterim, bucket.at("is_interim").as_bool()); } else { BOOST_TEST_REQUIRE(!bucket.contains("is_interim")); } const json::value& recordsWrapper_ = arrayDoc.at(std::size_t(4)); BOOST_TEST_REQUIRE(recordsWrapper_.is_object()); const json::object& recordsWrapper = recordsWrapper_.as_object(); BOOST_TEST_REQUIRE(recordsWrapper.contains("records")); const json::value& records_ = recordsWrapper.at("records"); BOOST_TEST_REQUIRE(records_.is_array()); const json::array& records = records_.as_array(); for (std::size_t i = 0; i < records.size(); i++) { BOOST_TEST_REQUIRE(records.at(i).as_object().contains("detector_index")); //BOOST_REQUIRE_EQUAL(0.1, records1[std::size_t(0)]["probability").to_number<double>()); BOOST_TEST_REQUIRE(records.at(i).as_object().contains("initial_record_score")); BOOST_TEST_REQUIRE(records.at(i).as_object().contains("record_score")); if (isInterim) { BOOST_TEST_REQUIRE(records.at(i).as_object().contains("is_interim")); BOOST_REQUIRE_EQUAL( isInterim, records.at(i).as_object().at("is_interim").as_bool()); } else { BOOST_TEST_REQUIRE(!records.at(i).as_object().contains("is_interim")); } } BOOST_REQUIRE_EQUAL(std::size_t(2), records.size()); } } ml::model::CHierarchicalResults::TNode createInfluencerNode(const std::string& personName, const std::string& personValue, double probability, double normalisedAnomalyScore) { ml::model::CHierarchicalResults::TResultSpec spec; spec.s_PersonFieldName = personName; spec.s_PersonFieldValue = personValue; ml::model::CHierarchicalResults::TNode node; node.s_AnnotatedProbability.s_Probability = probability; node.s_NormalizedAnomalyScore = normalisedAnomalyScore; node.s_Spec = spec; return node; } ml::model::CHierarchicalResults::TNode createBucketInfluencerNode(const std::string& personName, double probability, double normalisedAnomalyScore, double rawAnomalyScore) { ml::model::CHierarchicalResults::TResultSpec spec; spec.s_PersonFieldName = personName; ml::model::CHierarchicalResults::TNode node; node.s_AnnotatedProbability.s_Probability = probability; node.s_NormalizedAnomalyScore = normalisedAnomalyScore; node.s_RawAnomalyScore = rawAnomalyScore; node.s_Spec = spec; return node; } void testThroughputHelper(bool useScopedAllocator) { // Write to /dev/null (Unix) or nul (Windows) std::ofstream ofs(ml::core::COsFileFuncs::NULL_FILENAME); BOOST_TEST_REQUIRE(ofs.is_open()); ml::core::CJsonOutputStreamWrapper outputStream(ofs); ml::api::CJsonOutputWriter writer("job", outputStream); std::string partitionFieldName("tfn"); std::string partitionFieldValue(""); std::string overFieldName("pfn"); std::string overFieldValue("pfv"); std::string byFieldName("airline"); std::string byFieldValue("GAL"); std::string correlatedByFieldValue("BAW"); std::string fieldName("responsetime"); std::string function("mean"); std::string functionDescription("mean(responsetime)"); std::string emptyString; ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec influences; ml::api::CHierarchicalResultsWriter::SResults result11( false, false, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 2.24, 0.5, 0.0, 79, fieldName, influences, false, false, 1, 100); ml::api::CHierarchicalResultsWriter::SResults result112( false, true, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, TDouble1Vec(1, 10090.0), TDouble1Vec(1, 6953.0), 2.24, 0.5, 0.0, 79, fieldName, influences, false, false, 1, 100); ml::api::CHierarchicalResultsWriter::SResults result12( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.8, 0.0, -5.0, fieldName, influences, false, true, 2, 100, EMPTY_STRING_LIST, {}); ml::api::CHierarchicalResultsWriter::SResults result13( ml::api::CHierarchicalResultsWriter::E_SimpleCountResult, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.5, 0.0, -5.0, fieldName, influences, false, false, 3, 100, EMPTY_STRING_LIST, {}); ml::api::CHierarchicalResultsWriter::SResults result14( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.0, 0.0, -5.0, fieldName, influences, false, false, 4, 100, EMPTY_STRING_LIST, {}); // 1st bucket writer.acceptBucketTimeInfluencer(1, 0.01, 13.44, 70.0); // Write the record this many times static const size_t TEST_SIZE(1); ml::core_t::TTime start(ml::core::CTimeUtils::now()); LOG_INFO(<< "Starting throughput test at " << ml::core::CTimeUtils::toTimeString(start)); for (size_t count = 0; count < TEST_SIZE; ++count) { if (useScopedAllocator) { using TScopedAllocator = ml::core::CScopedBoostJsonPoolAllocator<ml::api::CJsonOutputWriter>; static const std::string ALLOCATOR_ID("CAnomalyJob::writeOutResults"); TScopedAllocator scopedAllocator(ALLOCATOR_ID, writer); BOOST_TEST_REQUIRE(writer.acceptResult(result11)); BOOST_TEST_REQUIRE(writer.acceptResult(result11)); BOOST_TEST_REQUIRE(writer.acceptResult(result112)); BOOST_TEST_REQUIRE(writer.acceptResult(result12)); BOOST_TEST_REQUIRE(writer.acceptResult(result12)); BOOST_TEST_REQUIRE(writer.acceptResult(result13)); BOOST_TEST_REQUIRE(writer.acceptResult(result13)); BOOST_TEST_REQUIRE(writer.acceptResult(result14)); BOOST_TEST_REQUIRE(writer.acceptResult(result14)); // Finished adding results BOOST_TEST_REQUIRE(writer.endOutputBatch(false, 1U)); } else { BOOST_TEST_REQUIRE(writer.acceptResult(result11)); BOOST_TEST_REQUIRE(writer.acceptResult(result11)); BOOST_TEST_REQUIRE(writer.acceptResult(result112)); BOOST_TEST_REQUIRE(writer.acceptResult(result12)); BOOST_TEST_REQUIRE(writer.acceptResult(result12)); BOOST_TEST_REQUIRE(writer.acceptResult(result13)); BOOST_TEST_REQUIRE(writer.acceptResult(result13)); BOOST_TEST_REQUIRE(writer.acceptResult(result14)); BOOST_TEST_REQUIRE(writer.acceptResult(result14)); // Finished adding results BOOST_TEST_REQUIRE(writer.endOutputBatch(false, 1U)); } } ml::core_t::TTime end(ml::core::CTimeUtils::now()); LOG_INFO(<< "Finished throughput test at " << ml::core::CTimeUtils::toTimeString(end)); LOG_INFO(<< "Writing " << TEST_SIZE << " records took " << (end - start) << " seconds"); } } BOOST_AUTO_TEST_CASE(testGeoResultsWrite) { std::string partitionFieldName("tfn"); std::string partitionFieldValue(""); std::string overFieldName("ofn"); std::string overFieldValue("ofv"); std::string byFieldName("airline"); std::string byFieldValue("GAL"); std::string correlatedByFieldValue("BAW"); std::string fieldName("location"); std::string function("lat_long"); std::string functionDescription("lat_long(location)"); ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec influences; std::string emptyString; std::string mean_function("mean"); // The output writer won't close the JSON structures until is is destroyed { std::ostringstream sstream; { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); TDouble1Vec actual(2, 0.0); actual[0] = 40.0; actual[1] = -40.0; TDouble1Vec typical(2, 0.0); typical[0] = 90.0; typical[1] = -90.0; ml::api::CHierarchicalResultsWriter::SResults result( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, 2.24, 79, typical, actual, 10.0, 10.0, 0.5, 0.0, fieldName, influences, false, true, 1, 1, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result)); BOOST_TEST_REQUIRE(writer.endOutputBatch(false, 1U)); } json::error_code ec; json::value arrayDoc_ = json::parse(sstream.str(), ec); // Debug print record { LOG_DEBUG(<< "Results:\n" << arrayDoc_); } BOOST_TEST_REQUIRE(arrayDoc_.is_array()); const json::array& arrayDoc = arrayDoc_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(2), arrayDoc.size()); BOOST_TEST_REQUIRE(arrayDoc.at(std::size_t(0)).as_object().contains("records")); const json::value& record_ = arrayDoc.at(std::size_t(0)).as_object().at("records").as_array().at(std::size_t(0)); const json::object& record = record_.as_object(); BOOST_TEST_REQUIRE(record.contains("typical")); BOOST_TEST_REQUIRE(record.contains("actual")); BOOST_TEST_REQUIRE(record.contains("geo_results")); auto geoResultsObject = record.at("geo_results").as_object(); BOOST_TEST_REQUIRE(geoResultsObject.contains("actual_point")); BOOST_REQUIRE_EQUAL("40.000000000000,-40.000000000000", geoResultsObject.at("actual_point").as_string()); BOOST_TEST_REQUIRE(geoResultsObject.contains("typical_point")); BOOST_REQUIRE_EQUAL("90.000000000000,-90.000000000000", geoResultsObject.at("typical_point").as_string()); } { std::ostringstream sstream; { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); TDouble1Vec actual(1, 500); TDouble1Vec typical(1, 64); ml::api::CHierarchicalResultsWriter::SResults result( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, 2.24, 79, typical, actual, 10.0, 10.0, 0.5, 0.0, fieldName, influences, false, true, 1, 1, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result)); BOOST_TEST_REQUIRE(writer.endOutputBatch(false, 1U)); } json::error_code ec; json::value arrayDoc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); // Debug print record LOG_DEBUG(<< "Results:\n" << arrayDoc_); BOOST_TEST_REQUIRE(arrayDoc_.is_array()); const json::array& arrayDoc = arrayDoc_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(2), arrayDoc.size()); BOOST_TEST_REQUIRE(arrayDoc.at(std::size_t(0)).as_object().contains("records")); const json::value& record_ = arrayDoc.at(std::size_t(0)).at("records").as_array().at(std::size_t(0)); BOOST_TEST_REQUIRE(record_.is_object()); json::object record = record_.as_object(); BOOST_TEST_REQUIRE(record.contains("geo_results")); auto geoResultsObject = record.at("geo_results").as_object(); BOOST_TEST_REQUIRE(!geoResultsObject.contains("actual_point")); BOOST_TEST_REQUIRE(!geoResultsObject.contains("typical_point")); } { std::ostringstream sstream; { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); TDouble1Vec actual(1, 500); TDouble1Vec typical(1, 64); ml::api::CHierarchicalResultsWriter::SResults result( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, mean_function, functionDescription, 2.24, 79, typical, actual, 10.0, 10.0, 0.5, 0.0, fieldName, influences, false, true, 1, 1, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result)); BOOST_TEST_REQUIRE(writer.endOutputBatch(false, 1U)); } json::error_code ec; json::value arrayDoc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); // Debug print record LOG_DEBUG(<< "Results:\n" << arrayDoc_); BOOST_TEST_REQUIRE(arrayDoc_.is_array()); const json::array& arrayDoc = arrayDoc_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(2), arrayDoc.size()); BOOST_TEST_REQUIRE(arrayDoc.at(std::size_t(0)).as_object().contains("records")); const json::value& record = arrayDoc.at(std::size_t(0)).at("records").as_array().at(std::size_t(0)); BOOST_TEST_REQUIRE(record.is_object()); BOOST_REQUIRE_EQUAL(false, record.as_object().contains("geo_results")); } } BOOST_AUTO_TEST_CASE(testWriteNonAnomalousBucket) { std::ostringstream sstream; std::string function("mean"); std::string functionDescription("mean(responsetime)"); std::string emptyString; ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec influences; { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); ml::api::CHierarchicalResultsWriter::SResults result( false, false, emptyString, emptyString, emptyString, emptyString, emptyString, emptyString, emptyString, 1, function, functionDescription, TDouble1Vec(1, 42.0), TDouble1Vec(1, 42.0), 0.0, 0.0, 1.0, 30, emptyString, influences, false, false, 1, 100); BOOST_TEST_REQUIRE(writer.acceptResult(result)); writer.acceptBucketTimeInfluencer(1, 1.0, 0.0, 0.0); BOOST_TEST_REQUIRE(writer.endOutputBatch(false, 10U)); writer.finalise(); } json::error_code ec; json::value arrayDoc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); // Debug print record LOG_DEBUG(<< "Results:\n" << arrayDoc_); BOOST_TEST_REQUIRE(arrayDoc_.is_array()); const json::array& arrayDoc = arrayDoc_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(1), arrayDoc.size()); const json::value& bucketWrapper_ = arrayDoc.at(std::size_t(0)); BOOST_TEST_REQUIRE(bucketWrapper_.is_object()); const json::object& bucketWrapper = bucketWrapper_.as_object(); BOOST_TEST_REQUIRE(bucketWrapper.contains("bucket")); const json::value& bucket_ = bucketWrapper_.at("bucket"); const json::object& bucket = bucket_.as_object(); BOOST_TEST_REQUIRE(bucket.contains("job_id")); BOOST_REQUIRE_EQUAL("job", bucket.at("job_id").as_string()); BOOST_REQUIRE_EQUAL(1000, bucket.at("timestamp").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(bucket.contains("bucket_influencers") == false); BOOST_REQUIRE_EQUAL(0, bucket.at("event_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(bucket.contains("anomaly_score")); BOOST_REQUIRE_CLOSE_ABSOLUTE(0.0, bucket.at("anomaly_score").to_number<double>(), 0.00001); } BOOST_AUTO_TEST_CASE(testFlush) { std::string testId("testflush"); ml::core_t::TTime lastFinalizedBucketEnd(123456789); std::ostringstream sstream; { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); writer.acknowledgeFlush(testId, lastFinalizedBucketEnd); } json::error_code ec; json::value arrayDoc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(arrayDoc_.is_array()); const json::array& arrayDoc = arrayDoc_.as_array(); LOG_DEBUG(<< "Flush:\n" << arrayDoc); const json::value& flushWrapper_ = arrayDoc.at(std::size_t(0)); BOOST_TEST_REQUIRE(flushWrapper_.is_object()); const json::object& flushWrapper = flushWrapper_.as_object(); BOOST_TEST_REQUIRE(flushWrapper.contains("flush")); const json::value& flush_ = flushWrapper.at("flush"); BOOST_TEST_REQUIRE(flush_.is_object()); const json::object& flush = flush_.as_object(); BOOST_TEST_REQUIRE(flush.contains("id")); BOOST_REQUIRE_EQUAL(testId, flush.at("id").as_string()); BOOST_TEST_REQUIRE(flush.contains("last_finalized_bucket_end")); BOOST_REQUIRE_EQUAL( lastFinalizedBucketEnd * 1000, static_cast<ml::core_t::TTime>( flush.at("last_finalized_bucket_end").to_number<std::int64_t>())); } BOOST_AUTO_TEST_CASE(testWriteCategoryDefinition) { ml::api::CGlobalCategoryId categoryId{42}; std::string terms("foo bar"); std::string regex(".*?foo.+?bar.*"); std::size_t maxMatchingLength(132); ml::api::CJsonOutputWriter::TStrFSet examples; examples.insert("User foo failed to log in"); examples.insert("User bar failed to log in"); std::ostringstream sstream; { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); writer.writeCategoryDefinition("", "", categoryId, terms, regex, maxMatchingLength, examples, 0, {}); } json::error_code ec; json::value arrayDoc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(arrayDoc_.is_array()); const json::array& arrayDoc = arrayDoc_.as_array(); LOG_DEBUG(<< "CategoryDefinition:\n" << arrayDoc); const json::value& categoryWrapper_ = arrayDoc.at(std::size_t(0)); BOOST_TEST_REQUIRE(categoryWrapper_.is_object()); const json::object& categoryWrapper = categoryWrapper_.as_object(); BOOST_TEST_REQUIRE(categoryWrapper.contains("category_definition")); const json::value& category_ = categoryWrapper.at("category_definition"); BOOST_TEST_REQUIRE(category_.is_object()); json::object category = category_.as_object(); BOOST_TEST_REQUIRE(category.contains("job_id")); BOOST_REQUIRE_EQUAL("job", category.at("job_id").as_string()); BOOST_TEST_REQUIRE(category.contains("partition_field_value") == false); BOOST_TEST_REQUIRE(category.contains("category_id")); BOOST_REQUIRE_EQUAL(categoryId.globalId(), category.at("category_id").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(category.contains("terms")); BOOST_REQUIRE_EQUAL(terms, category.at("terms").as_string()); BOOST_TEST_REQUIRE(category.contains("regex")); BOOST_REQUIRE_EQUAL(regex, category.at("regex").as_string()); BOOST_TEST_REQUIRE(category.contains("max_matching_length")); BOOST_REQUIRE_EQUAL(maxMatchingLength, static_cast<std::size_t>( category.at("max_matching_length").to_number<std::int64_t>())); BOOST_TEST_REQUIRE(category.contains("examples")); ml::api::CJsonOutputWriter::TStrFSet writtenExamplesSet; const json::value& writtenExamples_ = category.at("examples"); const json::array& writtenExamples = writtenExamples_.as_array(); for (std::size_t i = 0; i < writtenExamples.size(); i++) { writtenExamplesSet.insert(std::string(writtenExamples.at(i).as_string())); } BOOST_TEST_REQUIRE(writtenExamplesSet == examples); } BOOST_AUTO_TEST_CASE(testWritePerPartitionCategoryDefinition) { ml::api::CGlobalCategoryId categoryId{42}; std::string terms("foo bar"); std::string regex(".*?foo.+?bar.*"); std::size_t maxMatchingLength(132); ml::api::CJsonOutputWriter::TStrFSet examples; examples.insert("User foo failed to log in"); examples.insert("User bar failed to log in"); std::ostringstream sstream; { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); writer.writeCategoryDefinition("event.dataset", "elasticsearch", categoryId, terms, regex, maxMatchingLength, examples, 0, {}); } json::error_code ec; json::value arrayDoc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(arrayDoc_.is_array()); const json::array& arrayDoc = arrayDoc_.as_array(); LOG_DEBUG(<< "CategoryDefinition:\n" << arrayDoc); const json::value& categoryWrapper_ = arrayDoc.at(std::size_t(0)); BOOST_TEST_REQUIRE(categoryWrapper_.is_object()); const json::object& categoryWrapper = categoryWrapper_.as_object(); BOOST_TEST_REQUIRE(categoryWrapper.contains("category_definition")); const json::value& category_ = categoryWrapper.at("category_definition"); BOOST_TEST_REQUIRE(category_.is_object()); const json::object& category = category_.as_object(); BOOST_TEST_REQUIRE(category.contains("job_id")); BOOST_REQUIRE_EQUAL("job", category.at("job_id").as_string()); BOOST_TEST_REQUIRE(category.contains("partition_field_name")); BOOST_REQUIRE_EQUAL("event.dataset", category.at("partition_field_name").as_string()); BOOST_TEST_REQUIRE(category.contains("partition_field_value")); BOOST_REQUIRE_EQUAL("elasticsearch", category.at("partition_field_value").as_string()); BOOST_TEST_REQUIRE(category.contains("category_id")); BOOST_REQUIRE_EQUAL(categoryId.globalId(), category.at("category_id").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(category.contains("terms")); BOOST_REQUIRE_EQUAL(terms, category.at("terms").as_string()); BOOST_TEST_REQUIRE(category.contains("regex")); BOOST_REQUIRE_EQUAL(regex, category.at("regex").as_string()); BOOST_TEST_REQUIRE(category.contains("max_matching_length")); BOOST_REQUIRE_EQUAL(maxMatchingLength, static_cast<std::size_t>( category.at("max_matching_length").to_number<std::int64_t>())); BOOST_TEST_REQUIRE(category.contains("examples")); ml::api::CJsonOutputWriter::TStrFSet writtenExamplesSet; const json::value& writtenExamples_ = category.at("examples"); const json::array& writtenExamples = writtenExamples_.as_array(); for (std::size_t i = 0; i < writtenExamples.size(); i++) { writtenExamplesSet.insert(std::string(writtenExamples.at(i).as_string())); } BOOST_TEST_REQUIRE(writtenExamplesSet == examples); } BOOST_AUTO_TEST_CASE(testBucketWrite) { testBucketWriteHelper(false); } BOOST_AUTO_TEST_CASE(testBucketWriteInterim) { testBucketWriteHelper(true); } BOOST_AUTO_TEST_CASE(testLimitedRecordsWrite) { testLimitedRecordsWriteHelper(false); } BOOST_AUTO_TEST_CASE(testLimitedRecordsWriteInterim) { testLimitedRecordsWriteHelper(true); } BOOST_AUTO_TEST_CASE(testWriteInfluencers) { std::ostringstream sstream; { std::string user("user"); std::string daisy("daisy"); std::string jim("jim"); ml::model::CHierarchicalResults::TNode node1 = createInfluencerNode(user, daisy, 0.5, 10.0); ml::model::CHierarchicalResults::TNode node2 = createInfluencerNode(user, jim, 0.9, 100.0); ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); BOOST_TEST_REQUIRE(writer.acceptInfluencer(ml::core_t::TTime(42), node1, false)); BOOST_TEST_REQUIRE(writer.acceptInfluencer(ml::core_t::TTime(42), node2, false)); // Finished adding results BOOST_TEST_REQUIRE(writer.endOutputBatch(true, 1U)); } json::error_code ec; json::value doc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(doc_.is_array()); const json::array& doc = doc_.as_array(); LOG_DEBUG(<< "influencers:\n" << doc); BOOST_REQUIRE_EQUAL(std::size_t(2), doc.size()); const json::value& influencers_ = doc.at(std::size_t(0)).as_object().at("influencers"); BOOST_TEST_REQUIRE(influencers_.is_array()); const json::array& influencers = influencers_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(2), influencers.size()); const json::value& influencer_ = influencers.at(std::size_t(0)); const json::object& influencer = influencer_.as_object(); BOOST_TEST_REQUIRE(influencer.contains("job_id")); BOOST_REQUIRE_EQUAL("job", influencer.at("job_id").as_string()); BOOST_REQUIRE_CLOSE_ABSOLUTE(0.5, influencer.at("probability").to_number<double>(), 0.001); BOOST_REQUIRE_CLOSE_ABSOLUTE( 10.0, influencer.at("initial_influencer_score").to_number<double>(), 0.001); BOOST_TEST_REQUIRE(influencer.contains("influencer_score")); BOOST_REQUIRE_CLOSE_ABSOLUTE( 10.0, influencer.at("influencer_score").to_number<double>(), 0.001); BOOST_REQUIRE_EQUAL("user", influencer.at("influencer_field_name").as_string()); BOOST_REQUIRE_EQUAL("daisy", influencer.at("influencer_field_value").as_string()); BOOST_REQUIRE_EQUAL(42000, influencer.at("timestamp").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(influencer.at("is_interim").as_bool()); BOOST_TEST_REQUIRE(influencer.contains("bucket_span")); const json::value& influencer2_ = influencers.at(std::size_t(1)); const json::object& influencer2 = influencer2_.as_object(); BOOST_REQUIRE_CLOSE_ABSOLUTE( 0.9, influencer2.at("probability").to_number<double>(), 0.001); BOOST_REQUIRE_CLOSE_ABSOLUTE( 100.0, influencer2.at("initial_influencer_score").to_number<double>(), 0.001); BOOST_TEST_REQUIRE(influencer2.contains("influencer_score")); BOOST_REQUIRE_CLOSE_ABSOLUTE( 100.0, influencer2.at("influencer_score").to_number<double>(), 0.001); BOOST_REQUIRE_EQUAL("user", influencer2.at("influencer_field_name").as_string()); BOOST_REQUIRE_EQUAL("jim", influencer2.at("influencer_field_value").as_string()); BOOST_REQUIRE_EQUAL(42000, influencer2.at("timestamp").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(influencer2.at("is_interim").as_bool()); BOOST_TEST_REQUIRE(influencer2.contains("bucket_span")); const json::value& bucket = doc.at(std::size_t(1)).as_object().at("bucket"); BOOST_TEST_REQUIRE(bucket.as_object().contains("influencers") == false); } BOOST_AUTO_TEST_CASE(testWriteInfluencersWithLimit) { std::ostringstream sstream; { std::string user("user"); std::string computer("computer"); std::string monitor("monitor"); std::string daisy("daisy"); std::string jim("jim"); std::string bob("bob"); std::string laptop("laptop"); ml::model::CHierarchicalResults::TNode node1 = createInfluencerNode(user, daisy, 0.5, 10.0); ml::model::CHierarchicalResults::TNode node2 = createInfluencerNode(user, jim, 0.9, 100.0); ml::model::CHierarchicalResults::TNode node3 = createInfluencerNode(user, bob, 0.3, 9.0); ml::model::CHierarchicalResults::TNode node4 = createInfluencerNode(computer, laptop, 0.3, 12.0); ml::model::CHierarchicalResults::TNode bnode1 = createBucketInfluencerNode(user, 0.5, 10.0, 1.0); ml::model::CHierarchicalResults::TNode bnode2 = createBucketInfluencerNode(computer, 0.9, 100.0, 10.0); ml::model::CHierarchicalResults::TNode bnode3 = createBucketInfluencerNode(monitor, 0.3, 9.0, 0.9); ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); writer.limitNumberRecords(2); BOOST_TEST_REQUIRE(writer.acceptInfluencer(ml::core_t::TTime(0), node1, false)); BOOST_TEST_REQUIRE(writer.acceptInfluencer(ml::core_t::TTime(0), node2, false)); BOOST_TEST_REQUIRE(writer.acceptInfluencer(ml::core_t::TTime(0), node3, false)); BOOST_TEST_REQUIRE(writer.acceptInfluencer(ml::core_t::TTime(0), node4, false)); BOOST_TEST_REQUIRE(writer.acceptInfluencer(ml::core_t::TTime(0), bnode1, true)); BOOST_TEST_REQUIRE(writer.acceptInfluencer(ml::core_t::TTime(0), bnode2, true)); BOOST_TEST_REQUIRE(writer.acceptInfluencer(ml::core_t::TTime(0), bnode3, true)); // can't add a bucket influencer unless a result has been added std::string pfn("partition_field_name"); std::string pfv("partition_field_value"); std::string bfn("by_field_name"); std::string bfv("by_field_value"); std::string fun("function"); std::string fund("function_description"); std::string fn("field_name"); std::string emptyStr; ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec influences; ml::api::CHierarchicalResultsWriter::SResults result( ml::api::CHierarchicalResultsWriter::E_Result, pfn, pfv, bfn, bfv, emptyStr, 0, fun, fund, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 0.0, 0.1, 0.1, -5.0, fn, influences, false, true, 1, 100, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result)); writer.acceptBucketTimeInfluencer(ml::core_t::TTime(0), 0.6, 1.0, 10.0); // Finished adding results BOOST_TEST_REQUIRE(writer.endOutputBatch(false, 1U)); } json::error_code ec; json::value doc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(doc_.is_array()); const json::array& doc = doc_.as_array(); LOG_DEBUG(<< "limited write influencers:\n" << doc); const json::value& influencers_ = doc.at(std::size_t(1)).as_object().at("influencers"); BOOST_TEST_REQUIRE(influencers_.is_array()); const json::array& influencers = influencers_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(2), influencers.size()); const json::value& influencer_ = influencers.at(std::size_t(0)); const json::object& influencer = influencer_.as_object(); BOOST_REQUIRE_CLOSE_ABSOLUTE(0.9, influencer.at("probability").to_number<double>(), 0.001); BOOST_REQUIRE_CLOSE_ABSOLUTE( 100.0, influencer.at("initial_influencer_score").to_number<double>(), 0.001); BOOST_TEST_REQUIRE(influencer.contains("influencer_score")); BOOST_REQUIRE_CLOSE_ABSOLUTE( 100.0, influencer.at("influencer_score").to_number<double>(), 0.001); BOOST_REQUIRE_EQUAL("user", influencer.at("influencer_field_name").as_string()); BOOST_REQUIRE_EQUAL("jim", influencer.at("influencer_field_value").as_string()); BOOST_TEST_REQUIRE(influencer.contains("bucket_span")); const json::value& influencer2_ = influencers.at(std::size_t(1)); const json::object& influencer2 = influencer2_.as_object(); BOOST_REQUIRE_CLOSE_ABSOLUTE( 0.3, influencer2.at("probability").to_number<double>(), 0.001); BOOST_REQUIRE_CLOSE_ABSOLUTE( 12.0, influencer2.at("initial_influencer_score").to_number<double>(), 0.001); BOOST_TEST_REQUIRE(influencer2.contains("influencer_score")); BOOST_REQUIRE_CLOSE_ABSOLUTE( 12.0, influencer2.at("influencer_score").to_number<double>(), 0.001); BOOST_REQUIRE_EQUAL("computer", influencer2.at("influencer_field_name").as_string()); BOOST_REQUIRE_EQUAL("laptop", influencer2.at("influencer_field_value").as_string()); BOOST_TEST_REQUIRE(influencer2.contains("bucket_span")); // bucket influencers const json::value& bucketResult_ = doc.at(std::size_t(2)).as_object().at("bucket"); const json::object& bucketResult = bucketResult_.as_object(); BOOST_TEST_REQUIRE(bucketResult.contains("bucket_influencers")); const json::value& bucketInfluencers_ = bucketResult.at("bucket_influencers"); BOOST_TEST_REQUIRE(bucketInfluencers_.is_array()); const json::array& bucketInfluencers = bucketInfluencers_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(3), bucketInfluencers.size()); const json::value& binf_ = bucketInfluencers.at(std::size_t(0)); BOOST_TEST_REQUIRE(binf_.is_object()); const json::object& binf = binf_.as_object(); BOOST_REQUIRE_CLOSE_ABSOLUTE(0.9, binf.at("probability").to_number<double>(), 0.001); BOOST_REQUIRE_CLOSE_ABSOLUTE( 100.0, binf.at("initial_anomaly_score").to_number<double>(), 0.001); BOOST_TEST_REQUIRE(binf.contains("anomaly_score")); BOOST_REQUIRE_CLOSE_ABSOLUTE(100.0, binf.at("anomaly_score").to_number<double>(), 0.001); BOOST_REQUIRE_EQUAL("computer", binf.at("influencer_field_name").as_string()); BOOST_REQUIRE_CLOSE_ABSOLUTE( 10.0, binf.at("raw_anomaly_score").to_number<double>(), 0.001); const json::value& binf2_ = bucketInfluencers.at(std::size_t(1)); BOOST_TEST_REQUIRE(binf2_.is_object()); const json::object& binf2 = binf2_.as_object(); BOOST_REQUIRE_CLOSE_ABSOLUTE(0.5, binf2.at("probability").to_number<double>(), 0.001); BOOST_REQUIRE_CLOSE_ABSOLUTE( 10.0, binf2.at("initial_anomaly_score").to_number<double>(), 0.001); BOOST_TEST_REQUIRE(binf2.contains("anomaly_score")); BOOST_REQUIRE_CLOSE_ABSOLUTE(10.0, binf2.at("anomaly_score").to_number<double>(), 0.001); BOOST_REQUIRE_EQUAL("user", binf2.at("influencer_field_name").as_string()); BOOST_REQUIRE_CLOSE_ABSOLUTE( 1.0, binf2.at("raw_anomaly_score").to_number<double>(), 0.001); const json::value& binf3_ = bucketInfluencers.at(std::size_t(2)); BOOST_TEST_REQUIRE(binf3_.is_object()); const json::object& binf3 = binf3_.as_object(); BOOST_REQUIRE_CLOSE_ABSOLUTE(0.6, binf3.at("probability").to_number<double>(), 0.001); BOOST_REQUIRE_CLOSE_ABSOLUTE( 10.0, binf3.at("initial_anomaly_score").to_number<double>(), 0.001); BOOST_TEST_REQUIRE(binf3.contains("anomaly_score")); BOOST_REQUIRE_CLOSE_ABSOLUTE(10.0, binf3.at("anomaly_score").to_number<double>(), 0.001); BOOST_REQUIRE_EQUAL("bucket_time", binf3.at("influencer_field_name").as_string()); BOOST_REQUIRE_CLOSE_ABSOLUTE( 1.0, binf3.at("raw_anomaly_score").to_number<double>(), 0.001); } BOOST_AUTO_TEST_CASE(testWriteWithInfluences) { std::ostringstream sstream; { std::string partitionFieldName("tfn"); std::string partitionFieldValue("tfv"); std::string overFieldName("pfn"); std::string overFieldValue("pfv"); std::string byFieldName("airline"); std::string byFieldValue("GAL"); std::string fieldName("responsetime"); std::string function("mean"); std::string functionDescription("mean(responsetime)"); std::string emptyString; ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec influences; std::string user("user"); std::string dave("dave"); std::string jo("jo"); std::string cat("cat"); std::string host("host"); std::string localhost("localhost"); std::string webserver("web-server"); ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPr field1(user, dave); ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPr field2(user, cat); ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPr field3(user, jo); ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPr hostField1(host, localhost); ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPr hostField2(host, webserver); influences.emplace_back(field1, 0.4); influences.emplace_back(field2, 1.0); influences.emplace_back(hostField1, 0.7); influences.emplace_back(field3, 0.1); influences.emplace_back(hostField2, 0.8); // The output writer won't close the JSON structures until it is destroyed ml::api::CHierarchicalResultsWriter::SResults result( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, emptyString, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 0.0, 0.1, 0.1, -5.0, fieldName, influences, false, true, 1, 100, EMPTY_STRING_LIST, {}); ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); BOOST_TEST_REQUIRE(writer.acceptResult(result)); // Finished adding results BOOST_TEST_REQUIRE(writer.endOutputBatch(false, 1U)); } json::error_code ec; json::value doc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); // Debug print record LOG_DEBUG(<< "Results:\n" << doc_); BOOST_TEST_REQUIRE(doc_.is_array()); const json::array& doc = doc_.as_array(); BOOST_TEST_REQUIRE(doc.at(std::size_t(1)).as_object().contains("bucket")); const json::value& bucket_ = doc.at(std::size_t(1)).as_object().at("bucket"); const json::object& bucket = bucket_.as_object(); BOOST_TEST_REQUIRE(bucket.contains("records") == false); BOOST_TEST_REQUIRE(doc.at(std::size_t(0)).as_object().contains("records")); const json::value& records_ = doc.at(std::size_t(0)).as_object().at("records"); const json::array& records = records_.as_array(); BOOST_TEST_REQUIRE(records.at(std::size_t(0)).as_object().contains("influencers")); const json::value& influences_ = records.at(std::size_t(0)).as_object().at("influencers"); BOOST_TEST_REQUIRE(influences_.is_array()); const json::array& influences = influences_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(2), influences.size()); { const json::value& influence_ = influences.at(std::size_t(0)); const json::object& influence = influence_.as_object(); BOOST_TEST_REQUIRE(influence.contains("influencer_field_name")); BOOST_REQUIRE_EQUAL("host", influence.at("influencer_field_name").as_string()); BOOST_TEST_REQUIRE(influence.contains("influencer_field_values")); const json::value& influencerFieldValues_ = influence.at("influencer_field_values"); BOOST_TEST_REQUIRE(influencerFieldValues_.is_array()); const json::array& influencerFieldValues = influencerFieldValues_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(2), influencerFieldValues.size()); // Check influencers are ordered BOOST_REQUIRE_EQUAL("web-server", influencerFieldValues.at(std::size_t(0)).as_string()); BOOST_REQUIRE_EQUAL("localhost", influencerFieldValues.at(std::size_t(1)).as_string()); } { const json::value& influence_ = influences.at(std::size_t(1)); const json::object& influence = influence_.as_object(); BOOST_TEST_REQUIRE(influence.contains("influencer_field_name")); BOOST_REQUIRE_EQUAL("user", influence.at("influencer_field_name").as_string()); BOOST_TEST_REQUIRE(influence.contains("influencer_field_values")); const json::value& influencerFieldValues_ = influence.at("influencer_field_values"); BOOST_TEST_REQUIRE(influencerFieldValues_.is_array()); const json::array& influencerFieldValues = influencerFieldValues_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(3), influencerFieldValues.size()); // Check influencers are ordered BOOST_REQUIRE_EQUAL("cat", influencerFieldValues.at(std::size_t(0)).as_string()); BOOST_REQUIRE_EQUAL("dave", influencerFieldValues.at(std::size_t(1)).as_string()); BOOST_REQUIRE_EQUAL("jo", influencerFieldValues.at(std::size_t(2)).as_string()); } } BOOST_AUTO_TEST_CASE(testPersistNormalizer) { ml::model::CAnomalyDetectorModelConfig modelConfig = ml::model::CAnomalyDetectorModelConfig::defaultConfig(); std::ostringstream sstream; ml::core_t::TTime persistTime(1); { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); ml::model::CLimits limits(false); ml::model::CHierarchicalResultsNormalizer normalizer(limits, modelConfig); writer.persistNormalizer(normalizer, persistTime); writer.finalise(); } json::error_code ec; json::value doc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); // Debug print record LOG_DEBUG(<< "Results:\n" << doc_); BOOST_TEST_REQUIRE(doc_.is_array()); const json::array& doc = doc_.as_array(); BOOST_TEST_REQUIRE(persistTime <= ml::core::CTimeUtils::now()); BOOST_TEST_REQUIRE(persistTime > ml::core::CTimeUtils::now() - 10); const json::value& quantileWrapper_ = doc.at(std::size_t(0)); const json::object& quantileWrapper = quantileWrapper_.as_object(); BOOST_TEST_REQUIRE(quantileWrapper.contains("quantiles")); const json::value& quantileState_ = quantileWrapper.at("quantiles"); const json::object& quantileState = quantileState_.as_object(); BOOST_TEST_REQUIRE(quantileState.contains("job_id")); BOOST_REQUIRE_EQUAL("job", quantileState.at("job_id").as_string()); BOOST_TEST_REQUIRE(quantileState.contains("quantile_state")); BOOST_TEST_REQUIRE(quantileState.contains("timestamp")); } BOOST_AUTO_TEST_CASE(testReportMemoryUsage) { std::ostringstream sstream; { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); ml::model::CResourceMonitor::SModelSizeStats resourceUsage; resourceUsage.s_Usage = 1; resourceUsage.s_AdjustedUsage = 2; resourceUsage.s_PeakUsage = 3; resourceUsage.s_AdjustedPeakUsage = 4; resourceUsage.s_ByFields = 5; resourceUsage.s_PartitionFields = 6; resourceUsage.s_OverFields = 7; resourceUsage.s_AllocationFailures = 8; resourceUsage.s_MemoryStatus = ml::model_t::E_MemoryStatusHardLimit; resourceUsage.s_AssignmentMemoryBasis = ml::model_t::E_AssignmentBasisCurrentModelBytes; resourceUsage.s_BucketStartTime = 9; resourceUsage.s_BytesExceeded = 10; resourceUsage.s_BytesMemoryLimit = 11; resourceUsage.s_OverallCategorizerStats.s_CategorizedMessages = 12; resourceUsage.s_OverallCategorizerStats.s_TotalCategories = 13; resourceUsage.s_OverallCategorizerStats.s_FrequentCategories = 14; resourceUsage.s_OverallCategorizerStats.s_RareCategories = 15; resourceUsage.s_OverallCategorizerStats.s_DeadCategories = 16; resourceUsage.s_OverallCategorizerStats.s_MemoryCategorizationFailures = 17; resourceUsage.s_OverallCategorizerStats.s_CategorizationStatus = ml::model_t::E_CategorizationStatusWarn; writer.reportMemoryUsage(resourceUsage); writer.endOutputBatch(false, 1ul); } LOG_DEBUG(<< sstream.str()); json::error_code ec; json::value doc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(doc_.is_array()); const json::array& doc = doc_.as_array(); const json::value& resourceWrapper_ = doc.at(std::size_t(0)); const json::object& resourceWrapper = resourceWrapper_.as_object(); BOOST_TEST_REQUIRE(resourceWrapper.contains("model_size_stats")); const json::value& sizeStats_ = resourceWrapper.at("model_size_stats"); const json::object& sizeStats = sizeStats_.as_object(); BOOST_TEST_REQUIRE(sizeStats.contains("job_id")); BOOST_REQUIRE_EQUAL("job", sizeStats.at("job_id").as_string()); BOOST_TEST_REQUIRE(sizeStats.contains("model_bytes")); BOOST_REQUIRE_EQUAL(2, sizeStats.at("model_bytes").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("peak_model_bytes")); BOOST_REQUIRE_EQUAL(4, sizeStats.at("peak_model_bytes").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("total_by_field_count")); BOOST_REQUIRE_EQUAL(5, sizeStats.at("total_by_field_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("total_partition_field_count")); BOOST_REQUIRE_EQUAL( 6, sizeStats.at("total_partition_field_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("total_over_field_count")); BOOST_REQUIRE_EQUAL(7, sizeStats.at("total_over_field_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("bucket_allocation_failures_count")); BOOST_REQUIRE_EQUAL( 8, sizeStats.at("bucket_allocation_failures_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("timestamp")); BOOST_REQUIRE_EQUAL(9000, sizeStats.at("timestamp").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("memory_status")); BOOST_REQUIRE_EQUAL("hard_limit", sizeStats.at("memory_status").as_string()); BOOST_TEST_REQUIRE(sizeStats.contains("assignment_memory_basis")); BOOST_REQUIRE_EQUAL("current_model_bytes", sizeStats.at("assignment_memory_basis").as_string()); BOOST_TEST_REQUIRE(sizeStats.contains("log_time")); std::int64_t nowMs{ml::core::CTimeUtils::nowMs()}; BOOST_TEST_REQUIRE(nowMs >= sizeStats.at("log_time").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("model_bytes_exceeded")); BOOST_REQUIRE_EQUAL(10, sizeStats.at("model_bytes_exceeded").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("model_bytes_memory_limit")); BOOST_REQUIRE_EQUAL( 11, sizeStats.at("model_bytes_memory_limit").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("categorized_doc_count")); BOOST_REQUIRE_EQUAL(12, sizeStats.at("categorized_doc_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("total_category_count")); BOOST_REQUIRE_EQUAL(13, sizeStats.at("total_category_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("frequent_category_count")); BOOST_REQUIRE_EQUAL( 14, sizeStats.at("frequent_category_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("rare_category_count")); BOOST_REQUIRE_EQUAL(15, sizeStats.at("rare_category_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("dead_category_count")); BOOST_REQUIRE_EQUAL(16, sizeStats.at("dead_category_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("failed_category_count")); BOOST_REQUIRE_EQUAL(17, sizeStats.at("failed_category_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(sizeStats.contains("categorization_status")); BOOST_REQUIRE_EQUAL("warn", sizeStats.at("categorization_status").as_string()); } BOOST_AUTO_TEST_CASE(testWriteCategorizerStats) { std::ostringstream sstream; { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); ml::model::SCategorizerStats categorizerStats; categorizerStats.s_CategorizedMessages = 1; categorizerStats.s_TotalCategories = 2; categorizerStats.s_FrequentCategories = 3; categorizerStats.s_RareCategories = 4; categorizerStats.s_DeadCategories = 5; categorizerStats.s_MemoryCategorizationFailures = 6; categorizerStats.s_CategorizationStatus = ml::model_t::E_CategorizationStatusOk; writer.writeCategorizerStats("foo", "bar", categorizerStats, 7); writer.endOutputBatch(false, 1ul); } LOG_DEBUG(<< sstream.str()); json::error_code ec; json::value doc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); BOOST_TEST_REQUIRE(doc_.is_array()); const json::array& doc = doc_.as_array(); const json::value& resourceWrapper_ = doc.at(std::size_t(0)); const json::object& resourceWrapper = resourceWrapper_.as_object(); BOOST_TEST_REQUIRE(resourceWrapper.contains("categorizer_stats")); const json::value& categorizerStats_ = resourceWrapper.at("categorizer_stats"); const json::object& categorizerStats = categorizerStats_.as_object(); BOOST_TEST_REQUIRE(categorizerStats.contains("job_id")); BOOST_REQUIRE_EQUAL("job", categorizerStats.at("job_id").as_string()); BOOST_TEST_REQUIRE(categorizerStats.contains("partition_field_name")); BOOST_REQUIRE_EQUAL("foo", categorizerStats.at("partition_field_name").as_string()); BOOST_TEST_REQUIRE(categorizerStats.contains("partition_field_value")); BOOST_REQUIRE_EQUAL("bar", categorizerStats.at("partition_field_value").as_string()); BOOST_TEST_REQUIRE(categorizerStats.contains("categorized_doc_count")); BOOST_REQUIRE_EQUAL( 1, categorizerStats.at("categorized_doc_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(categorizerStats.contains("total_category_count")); BOOST_REQUIRE_EQUAL( 2, categorizerStats.at("total_category_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(categorizerStats.contains("frequent_category_count")); BOOST_REQUIRE_EQUAL( 3, categorizerStats.at("frequent_category_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(categorizerStats.contains("rare_category_count")); BOOST_REQUIRE_EQUAL( 4, categorizerStats.at("rare_category_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(categorizerStats.contains("dead_category_count")); BOOST_REQUIRE_EQUAL( 5, categorizerStats.at("dead_category_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(categorizerStats.contains("failed_category_count")); BOOST_REQUIRE_EQUAL( 6, categorizerStats.at("failed_category_count").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(categorizerStats.contains("categorization_status")); BOOST_REQUIRE_EQUAL("ok", categorizerStats.at("categorization_status").as_string()); BOOST_TEST_REQUIRE(categorizerStats.contains("categorization_status")); BOOST_REQUIRE_EQUAL("ok", categorizerStats.at("categorization_status").as_string()); BOOST_TEST_REQUIRE(categorizerStats.contains("timestamp")); BOOST_REQUIRE_EQUAL(7000, categorizerStats.at("timestamp").to_number<std::int64_t>()); BOOST_TEST_REQUIRE(categorizerStats.contains("log_time")); std::int64_t nowMs{ml::core::CTimeUtils::nowMs()}; BOOST_TEST_REQUIRE(nowMs >= categorizerStats.at("log_time").to_number<std::int64_t>()); } BOOST_AUTO_TEST_CASE(testWriteScheduledEvent) { std::ostringstream sstream; { std::string partitionFieldName("tfn"); std::string partitionFieldValue("tfv"); std::string byFieldName("airline"); std::string byFieldValue("GAL"); std::string fieldName("responsetime"); std::string function("mean"); std::string functionDescription("mean(responsetime)"); std::string emptyString; ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec influences; ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); // This result has no scheduled events ml::api::CHierarchicalResultsWriter::SResults result( ml::api::CHierarchicalResultsWriter::E_SimpleCountResult, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, emptyString, 100, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 0.0, 0.1, 0.1, -5.0, fieldName, influences, false, true, 1, 100, EMPTY_STRING_LIST, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result)); // This result has 2 scheduled events std::vector<std::string> eventDescriptions{"event-foo", "event-bar"}; ml::api::CHierarchicalResultsWriter::SResults result2( ml::api::CHierarchicalResultsWriter::E_SimpleCountResult, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, emptyString, 200, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 0.0, 0.1, 0.1, -5.0, fieldName, influences, false, true, 1, 100, eventDescriptions, {}); BOOST_TEST_REQUIRE(writer.acceptResult(result2)); BOOST_TEST_REQUIRE(writer.endOutputBatch(false, 1U)); } json::error_code ec; json::value doc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); // Debug print record LOG_DEBUG(<< "Results:\n" << doc_); BOOST_TEST_REQUIRE(doc_.is_array()); const json::array& doc = doc_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(2), doc.size()); // the first bucket has no events const json::value& bucket_ = doc.at(std::size_t(1)).as_object().at("bucket"); const json::object& bucket = bucket_.as_object(); BOOST_TEST_REQUIRE(bucket.contains("scheduled_event") == false); const json::value& bucketWithEvents_ = doc.at(std::size_t(1)).at("bucket"); const json::object& bucketWithEvents = bucketWithEvents_.as_object(); BOOST_TEST_REQUIRE(bucketWithEvents.contains("scheduled_events")); const json::value& events_ = bucketWithEvents.at("scheduled_events"); BOOST_TEST_REQUIRE(events_.is_array()); const json::array& events = events_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(2), events.size()); BOOST_REQUIRE_EQUAL("event-foo", events.at(std::size_t(0)).as_string()); BOOST_REQUIRE_EQUAL("event-bar", events.at(std::size_t(1)).as_string()); } BOOST_AUTO_TEST_CASE(testThroughputWithScopedAllocator) { testThroughputHelper(true); } BOOST_AUTO_TEST_CASE(testThroughputWithoutScopedAllocator) { testThroughputHelper(false); } BOOST_AUTO_TEST_CASE(testRareAnomalyScoreExplanation) { // Ensure that anomaly score explanation fields for rare events // are outputted. std::ostringstream sstream; { ml::core::CJsonOutputStreamWrapper outputStream(sstream); ml::api::CJsonOutputWriter writer("job", outputStream); std::string partitionFieldName("Carrier"); std::string partitionFieldValue("JetBeats"); std::string overFieldName("pfn"); std::string overFieldValue("pfv"); std::string byFieldName("Dest"); std::string byFieldValue("Adelaide International Airport"); std::string correlatedByFieldValue("BAW"); std::string fieldName("clientip"); std::string function("rare"); std::string functionDescription("rare"); std::string emptyString; ml::api::CHierarchicalResultsWriter::TOptionalStrOptionalStrPrDoublePrVec influences; ml::api::CHierarchicalResultsWriter::TAnomalyScoreExplanation anomalyScoreExplanation; { anomalyScoreExplanation.s_ByFieldFirstOccurrence = true; anomalyScoreExplanation.s_ByFieldActualConcentration = 0.1; anomalyScoreExplanation.s_ByFieldTypicalConcentration = 0.5; ml::api::CHierarchicalResultsWriter::SResults result( ml::api::CHierarchicalResultsWriter::E_Result, partitionFieldName, partitionFieldValue, byFieldName, byFieldValue, correlatedByFieldValue, 1, function, functionDescription, 42.0, 79, TDouble1Vec(1, 6953.0), TDouble1Vec(1, 10090.0), 2.24, 0.8, 0.0, -5.0, fieldName, influences, false, false, 2, 100, EMPTY_STRING_LIST, anomalyScoreExplanation); // 1st bucket BOOST_TEST_REQUIRE(writer.acceptResult(result)); } // Finished adding results BOOST_TEST_REQUIRE(writer.endOutputBatch(false, 10U)); } json::error_code ec; json::value arrayDoc_ = json::parse(sstream.str(), ec); BOOST_TEST_REQUIRE(ec.failed() == false); // Debug print record LOG_DEBUG(<< "Results:\n" << arrayDoc_); BOOST_TEST_REQUIRE(arrayDoc_.is_array()); const json::array& arrayDoc = arrayDoc_.as_array(); BOOST_REQUIRE_EQUAL(std::size_t(2), arrayDoc.size()); BOOST_TEST_REQUIRE(arrayDoc.at(0).as_object().contains("records")); BOOST_TEST_REQUIRE(arrayDoc.at(0).as_object().at("records").is_array()); BOOST_REQUIRE_EQUAL(std::size_t(1), arrayDoc.at(0).as_object().at("records").as_array().size()); const auto& record = arrayDoc.at(0).as_object().at("records").as_array().at(0).as_object(); BOOST_TEST_REQUIRE(record.contains("anomaly_score_explanation")); BOOST_TEST_REQUIRE(record.at("anomaly_score_explanation").as_object().contains("by_field_first_occurrence")); BOOST_REQUIRE_EQUAL(true, record.at("anomaly_score_explanation") .as_object() .at("by_field_first_occurrence") .as_bool()); BOOST_TEST_REQUIRE(record.at("anomaly_score_explanation").as_object().contains("by_field_relative_rarity")); BOOST_REQUIRE_EQUAL(5.0, record.at("anomaly_score_explanation") .as_object() .at("by_field_relative_rarity") .to_number<double>()); } BOOST_AUTO_TEST_SUITE_END()