lib/model/unittest/CEventRateModelTest.cc (2,397 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <core/CJsonStatePersistInserter.h> #include <core/CJsonStateRestoreTraverser.h> #include <core/CLogger.h> #include <core/Constants.h> #include <core/CoreTypes.h> #include <maths/common/CBasicStatistics.h> #include <maths/common/CIntegerTools.h> #include <maths/common/CModelWeight.h> #include <maths/common/CPrior.h> #include <model/CAnnotatedProbability.h> #include <model/CAnomalyDetectorModelConfig.h> #include <model/CCountingModel.h> #include <model/CDataGatherer.h> #include <model/CDetectionRule.h> #include <model/CEventData.h> #include <model/CEventRateModel.h> #include <model/CEventRateModelFactory.h> #include <model/CEventRatePopulationModelFactory.h> #include <model/CInterimBucketCorrector.h> #include <model/CModelDetailsView.h> #include <model/CModelFactory.h> #include <model/CPartitioningFields.h> #include <model/CResourceMonitor.h> #include <model/CRuleCondition.h> #include <test/BoostTestCloseAbsolute.h> #include <test/CRandomNumbers.h> #include "CModelTestFixtureBase.h" #include <boost/test/unit_test.hpp> #include <cstdint> #include <map> #include <memory> #include <string> #include <vector> BOOST_TEST_DONT_PRINT_LOG_VALUE(CModelTestFixtureBase::TStrVec::iterator) BOOST_AUTO_TEST_SUITE(CEventRateModelTest) using namespace ml; using namespace model; namespace { const std::string EMPTY_STRING; const CModelTestFixtureBase::TSizeDoublePr1Vec NO_CORRELATES; } // unnamed:: class CTestFixture : public CModelTestFixtureBase { public: TUInt64Vec rawEventCounts(std::size_t copies = 1) { std::uint64_t counts[] = {54, 67, 39, 58, 46, 50, 42, 48, 53, 51, 50, 57, 53, 49}; TUInt64Vec result; for (std::size_t i = 0; i < copies; ++i) { result.insert(result.end(), std::begin(counts), std::end(counts)); } return result; } void generateEvents(const core_t::TTime& startTime, const core_t::TTime& bucketLength, const TUInt64Vec& eventCountsPerBucket, TTimeVec& eventArrivalTimes) { // Generate an ordered collection of event arrival times. test::CRandomNumbers rng; auto bucketStartTime = static_cast<double>(startTime); for (auto count : eventCountsPerBucket) { double bucketEndTime = bucketStartTime + static_cast<double>(bucketLength); TDoubleVec bucketEventTimes; rng.generateUniformSamples(bucketStartTime, bucketEndTime - 1.0, static_cast<std::size_t>(count), bucketEventTimes); std::sort(bucketEventTimes.begin(), bucketEventTimes.end()); for (auto time_ : bucketEventTimes) { auto time = static_cast<core_t::TTime>(time_); time = std::min(static_cast<core_t::TTime>(bucketEndTime - 1.0), std::max(static_cast<core_t::TTime>(bucketStartTime), time)); eventArrivalTimes.push_back(time); } bucketStartTime = bucketEndTime; } } void generateSporadicEvents(const core_t::TTime& startTime, const core_t::TTime& bucketLength, const TUInt64Vec& nonZeroEventCountsPerBucket, TTimeVec& eventArrivalTimes) { // Generate an ordered collection of event arrival times. test::CRandomNumbers rng; auto bucketStartTime = static_cast<double>(startTime); for (auto count : nonZeroEventCountsPerBucket) { double bucketEndTime = bucketStartTime + static_cast<double>(bucketLength); TDoubleVec bucketEventTimes; rng.generateUniformSamples(bucketStartTime, bucketEndTime - 1.0, static_cast<std::size_t>(count), bucketEventTimes); std::sort(bucketEventTimes.begin(), bucketEventTimes.end()); for (auto time_ : bucketEventTimes) { auto time = static_cast<core_t::TTime>(time_); time = std::min(static_cast<core_t::TTime>(bucketEndTime - 1.0), std::max(static_cast<core_t::TTime>(bucketStartTime), time)); eventArrivalTimes.push_back(time); } TDoubleVec gap; rng.generateUniformSamples(0.0, 10.0, 1, gap); bucketStartTime += static_cast<double>(bucketLength) * std::ceil(gap[0]); } } void handleEvent(const CDataGatherer::TStrCPtrVec& fields, core_t::TTime time, CModelFactory::TDataGathererPtr& gatherer, CResourceMonitor& resourceMonitor) { CEventData eventResult; eventResult.time(time); gatherer->addArrival(fields, eventResult, resourceMonitor); } void testModelWithValueField(model_t::EFeature feature, TSizeVecVecVec& fields, TStrVec& strings, CResourceMonitor& resourceMonitor) { LOG_DEBUG(<< " *** testing feature " << model_t::print(feature)); const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{3600}; SModelParams params(bucketLength); auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRatePopulationModelFactory factory(params, interimBucketCorrector); factory.features({feature}); factory.fieldNames("", "", "P", "V", TStrVec()); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); CModelFactory::TModelPtr model(factory.makeModel(gatherer)); BOOST_TEST_REQUIRE(model); std::size_t anomalousBucket{20}; std::size_t numberBuckets{30}; const core_t::TTime endTime = startTime + (numberBuckets * bucketLength); std::size_t i{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, i++) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; for (std::size_t j = 0; j < fields[i].size(); ++j) { CDataGatherer::TStrCPtrVec f{&strings[fields[i][j][0]], &strings[fields[i][j][1]], &strings[fields[i][j][2]]}; handleEvent(f, bucketStartTime + j, gatherer, resourceMonitor); } model->sample(bucketStartTime, bucketEndTime, resourceMonitor); SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); model->computeProbability(0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, annotatedProbability); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); if (i == anomalousBucket) { BOOST_TEST_REQUIRE(annotatedProbability.s_Probability < 0.001); } else { BOOST_TEST_REQUIRE(annotatedProbability.s_Probability > 0.6); } } } void makeModel(const SModelParams& params, const model_t::TFeatureVec& features, core_t::TTime startTime, std::size_t numberPeople, const std::string& summaryCountField = EMPTY_STRING) { this->makeModelT<CEventRateModelFactory>(params, features, startTime, model_t::E_EventRateOnline, m_Gatherer, m_Model, {}, summaryCountField); for (std::size_t i = 0; i < numberPeople; ++i) { BOOST_REQUIRE_EQUAL( i, this->addPerson("p" + core::CStringUtils::typeToString(i + 1), m_Gatherer)); } } protected: using TDoubleSizeStrTr = core::CTriple<double, std::size_t, std::string>; using TMinAccumulator = maths::common::CBasicStatistics::COrderStatisticsHeap<TDoubleSizeStrTr>; using TMinAccumulatorVec = std::vector<TMinAccumulator>; }; BOOST_FIXTURE_TEST_CASE(testCountSample, CTestFixture) { const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{3600}; SModelParams params(bucketLength); params.s_InitialDecayRateMultiplier = 1.0; this->makeModel(params, {model_t::E_IndividualCountByBucketAndPerson}, startTime, 1); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); BOOST_TEST_REQUIRE(model); TMathsModelPtr timeseriesModel{m_Factory->defaultFeatureModel( model_t::E_IndividualCountByBucketAndPerson, bucketLength, 0.4, true)}; maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec weights{ maths_t::CUnitWeights::unit<TDouble2Vec>(1)}; // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts(rawEventCounts()); generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); std::size_t i{0}; std::size_t j{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, ++j) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; for (/**/; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { this->addArrival(SMessage(eventTimes[i], "p1", TOptionalDouble()), m_Gatherer); } model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); maths::common::CModelAddSamplesParams params_; params_.isInteger(true) .isNonNegative(true) .propagationInterval(1.0) .trendWeights(weights) .priorWeights(weights) .bucketOccupancy(1.0) .firstValueTime(startTime); double sample{static_cast<double>(expectedEventCounts[j])}; maths::common::CModel::TTimeDouble2VecSizeTrVec expectedSamples{core::make_triple( (bucketStartTime + bucketEndTime) / 2, maths::common::CModel::TDouble2Vec{sample}, std::size_t{0})}; timeseriesModel->addSamples(params_, expectedSamples); // Test we sample the data correctly. BOOST_REQUIRE_EQUAL(expectedEventCounts[j], static_cast<std::uint64_t>(model->currentBucketValue( model_t::E_IndividualCountByBucketAndPerson, 0, 0, bucketStartTime)[0])); BOOST_REQUIRE_EQUAL(timeseriesModel->checksum(), model->details() ->model(model_t::E_IndividualCountByBucketAndPerson, 0) ->checksum()); } // Test persistence. (We check for idempotency.) std::ostringstream origJson; core::CJsonStatePersistInserter::persist( origJson, [&model](core::CJsonStatePersistInserter& inserter) { model->acceptPersistInserter(inserter); }); LOG_TRACE(<< "origJson = " << origJson.str()); LOG_DEBUG(<< "origJson size = " << origJson.str().size()); BOOST_TEST_REQUIRE(origJson.str().size() < 41000); // Restore the Json into a new filter // The traverser expects the state json in a embedded document std::istringstream origJsonStrm("{\"topLevel\" : " + origJson.str() + "}"); core::CJsonStateRestoreTraverser traverser(origJsonStrm); CModelFactory::TModelPtr restoredModelPtr(m_Factory->makeModel(m_Gatherer, traverser)); // The Json representation of the new filter should be the same as the original std::ostringstream newJson; core::CJsonStatePersistInserter::persist( newJson, [&restoredModelPtr](core::CJsonStatePersistInserter& inserter) { restoredModelPtr->acceptPersistInserter(inserter); }); LOG_DEBUG(<< "original checksum = " << model->checksum(false)); LOG_DEBUG(<< "restored checksum = " << restoredModelPtr->checksum(false)); BOOST_REQUIRE_EQUAL(model->checksum(false), restoredModelPtr->checksum(false)); BOOST_REQUIRE_EQUAL(origJson.str(), newJson.str()); } BOOST_FIXTURE_TEST_CASE(testNonZeroCountSample, CTestFixture) { const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{3600}; SModelParams params(bucketLength); params.s_InitialDecayRateMultiplier = 1.0; this->makeModel(params, {model_t::E_IndividualNonZeroCountByBucketAndPerson}, startTime, 1); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); BOOST_TEST_REQUIRE(model); TMathsModelPtr timeseriesModel{m_Factory->defaultFeatureModel( model_t::E_IndividualNonZeroCountByBucketAndPerson, bucketLength, 0.4, true)}; maths::common::CModelAddSamplesParams::TDouble2VecWeightsAryVec weights{ maths_t::CUnitWeights::unit<TDouble2Vec>(1)}; // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(); generateSporadicEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); std::size_t i{0}; std::size_t j{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { this->addArrival(SMessage(eventTimes[i], "p1", TOptionalDouble()), m_Gatherer); } model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); if (*model->currentBucketCount(0, bucketStartTime) > 0) { maths::common::CModelAddSamplesParams params_; params_.isInteger(true) .isNonNegative(true) .propagationInterval(1.0) .trendWeights(weights) .priorWeights(weights) .firstValueTime(startTime); double sample{static_cast<double>(model_t::offsetCountToZero( model_t::E_IndividualNonZeroCountByBucketAndPerson, static_cast<double>(expectedEventCounts[j])))}; maths::common::CModel::TTimeDouble2VecSizeTrVec expectedSamples{core::make_triple( (bucketStartTime + bucketEndTime) / 2, maths::common::CModel::TDouble2Vec{sample}, std::size_t{0})}; timeseriesModel->addSamples(params_, expectedSamples); // Test we sample the data correctly. BOOST_REQUIRE_EQUAL(expectedEventCounts[j], static_cast<std::uint64_t>(model->currentBucketValue( model_t::E_IndividualNonZeroCountByBucketAndPerson, 0, 0, bucketStartTime)[0])); BOOST_REQUIRE_EQUAL(timeseriesModel->checksum(), model->details() ->model(model_t::E_IndividualNonZeroCountByBucketAndPerson, 0) ->checksum()); ++j; } } } BOOST_FIXTURE_TEST_CASE(testRare, CTestFixture) { const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{3600}; SModelParams params(bucketLength); this->makeModel(params, {model_t::E_IndividualTotalBucketCountByPerson, model_t::E_IndividualIndicatorOfBucketPerson}, startTime, 5); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); core_t::TTime time{startTime}; for (/**/; time < startTime + 10 * bucketLength; time += bucketLength) { this->addArrival(SMessage(time + bucketLength / 2, "p1", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p2", TOptionalDouble()), m_Gatherer); model->sample(time, time + bucketLength, m_ResourceMonitor); } for (/**/; time < startTime + 50 * bucketLength; time += bucketLength) { this->addArrival(SMessage(time + bucketLength / 2, "p1", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p2", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p3", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p4", TOptionalDouble()), m_Gatherer); model->sample(time, time + bucketLength, m_ResourceMonitor); } this->addArrival(SMessage(time + bucketLength / 2, "p1", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p2", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p3", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p4", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p5", TOptionalDouble()), m_Gatherer); model->sample(time, time + bucketLength, m_ResourceMonitor); TDoubleVec probabilities; for (std::size_t pid = 0; pid < 5; ++pid) { SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( pid, time, time + bucketLength, partitioningFields, 0, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "anomaly score explanation = " << annotatedProbability.s_AnomalyScoreExplanation); probabilities.push_back(annotatedProbability.s_Probability); } // We expect "p1 = p2 > p3 = p4 >> p5". BOOST_REQUIRE_EQUAL(5, probabilities.size()); BOOST_REQUIRE_EQUAL(probabilities[0], probabilities[1]); BOOST_TEST_REQUIRE(probabilities[1] > probabilities[2]); BOOST_REQUIRE_EQUAL(probabilities[2], probabilities[3]); BOOST_TEST_REQUIRE(probabilities[3] > 50.0 * probabilities[4]); std::ostringstream origJson; core::CJsonStatePersistInserter::persist( origJson, [&model](core::CJsonStatePersistInserter& inserter) { model->acceptPersistInserter(inserter); }); LOG_TRACE(<< "origJson = " << origJson.str()); LOG_DEBUG(<< "origJson size = " << origJson.str().size()); BOOST_TEST_REQUIRE(origJson.str().size() < 22000); // Restore the Json into a new filter // The traverser expects the state json in a embedded document std::istringstream origJsonStrm{"{\"topLevel\" : " + origJson.str() + "}"}; core::CJsonStateRestoreTraverser traverser(origJsonStrm); CModelFactory::TModelPtr restoredModelPtr(m_Factory->makeModel(m_Gatherer, traverser)); // The Json representation of the new filter should be the same as the original std::ostringstream newJson; core::CJsonStatePersistInserter::persist( newJson, [&restoredModelPtr](core::CJsonStatePersistInserter& inserter) { restoredModelPtr->acceptPersistInserter(inserter); }); BOOST_REQUIRE_EQUAL(origJson.str(), newJson.str()); } BOOST_FIXTURE_TEST_CASE(testProbabilityCalculation, CTestFixture) { using TDoubleSizeAnotatedProbabilityTr = core::CTriple<double, std::size_t, SAnnotatedProbability>; using TMinAccumulator = maths::common::CBasicStatistics::COrderStatisticsHeap< TDoubleSizeAnotatedProbabilityTr, std::function<bool(const TDoubleSizeAnotatedProbabilityTr&, const TDoubleSizeAnotatedProbabilityTr&)>>; const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{3600}; TSizeVecVec anomalousBuckets{{25}, {24, 25, 26, 27}}; TDoubleVec anomalousBucketsRateMultipliers{3.0, 1.3}; for (std::size_t t = 0; t < 2; ++t) { // Create the model. SModelParams params(bucketLength); params.s_DecayRate = 0.001; this->makeModel(params, {model_t::E_IndividualCountByBucketAndPerson}, startTime, 1); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(2); for (auto i : anomalousBuckets[t]) { expectedEventCounts[i] = static_cast<std::size_t>(static_cast<double>(expectedEventCounts[i]) * anomalousBucketsRateMultipliers[t]); } generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); // Play the data through the model and get the lowest probability buckets. TMinAccumulator minProbabilities(2, TDoubleSizeAnotatedProbabilityTr{}, [](const TDoubleSizeAnotatedProbabilityTr& lhs, const TDoubleSizeAnotatedProbabilityTr& rhs) { return lhs.first < rhs.first; }); std::size_t i{0}; for (core_t::TTime j = 0, bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, ++j) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; double count{0.0}; for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { this->addArrival(SMessage(eventTimes[i], "p1", TOptionalDouble()), m_Gatherer); count += 1.0; } model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); SAnnotatedProbability p; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, p)); LOG_DEBUG(<< "bucket count = " << count << ", probability = " << p.s_Probability); minProbabilities.add({p.s_Probability, static_cast<std::size_t>(j), p}); } minProbabilities.sort(); if (anomalousBuckets[t].size() == 1) { // Check the one anomalous bucket has the lowest probability by a significant margin. BOOST_REQUIRE_EQUAL(anomalousBuckets[0][0], minProbabilities[0].second); BOOST_TEST_REQUIRE(minProbabilities[0].first / minProbabilities[1].first < 0.1); } else { // Check the multi-bucket impact values are relatively high // (indicating a large contribution from multi-bucket analysis) double expectedMultiBucketImpactThresholds[2]{0.3, 2.5}; for (int j = 0; j < 2; ++j) { double multiBucketImpact = minProbabilities[j].third.s_MultiBucketImpact; LOG_DEBUG(<< "multi_bucket_impact = " << multiBucketImpact); BOOST_TEST_REQUIRE(multiBucketImpact >= expectedMultiBucketImpactThresholds[j]); BOOST_TEST_REQUIRE(multiBucketImpact <= CAnomalyDetectorModelConfig::MAXIMUM_MULTI_BUCKET_IMPACT_MAGNITUDE); } } } } BOOST_FIXTURE_TEST_CASE(testProbabilityCalculationForLowNonZeroCount, CTestFixture) { core_t::TTime startTime{0}; core_t::TTime bucketLength{100}; std::size_t lowNonZeroCountBucket{6}; std::size_t highNonZeroCountBucket{8}; TSizeVec bucketCounts{50, 50, 50, 50, 50, 0, 0, 0, 50, 1, 50, 100, 50, 50}; SModelParams params(bucketLength); params.s_DecayRate = 0.001; this->makeModel(params, {model_t::E_IndividualLowNonZeroCountByBucketAndPerson}, startTime, 1); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); TDoubleVec probabilities; core_t::TTime time{startTime}; for (auto count : bucketCounts) { LOG_DEBUG(<< "Writing " << count << " values"); for (std::size_t i = 0; i < count; ++i) { this->addArrival(SMessage(time + static_cast<core_t::TTime>(i), "p1", TOptionalDouble()), m_Gatherer); } model->sample(time, time + bucketLength, m_ResourceMonitor); SAnnotatedProbability p; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); if (model->computeProbability(0 /*pid*/, time, time + bucketLength, partitioningFields, 0, p) == false) { continue; } LOG_DEBUG(<< "probability = " << p.s_Probability); if (*model->currentBucketCount(0, time) > 0) { probabilities.push_back(p.s_Probability); } time += bucketLength; } LOG_DEBUG(<< "probabilities = " << probabilities); BOOST_REQUIRE_EQUAL(11, probabilities.size()); BOOST_TEST_REQUIRE(probabilities[lowNonZeroCountBucket] < 0.06); BOOST_TEST_REQUIRE(probabilities[highNonZeroCountBucket] > 0.9); } BOOST_FIXTURE_TEST_CASE(testProbabilityCalculationForHighNonZeroCount, CTestFixture) { core_t::TTime startTime{0}; core_t::TTime bucketLength{100}; std::size_t lowNonZeroCountBucket{6}; std::size_t highNonZeroCountBucket{8}; TSizeVec bucketCounts{50, 50, 50, 50, 50, 0, 0, 0, 50, 100, 50, 1, 50, 50}; SModelParams params(bucketLength); params.s_DecayRate = 0.001; this->makeModel(params, {model_t::E_IndividualHighNonZeroCountByBucketAndPerson}, startTime, 1); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); TDoubleVec probabilities; core_t::TTime time{startTime}; for (auto count : bucketCounts) { LOG_DEBUG(<< "Writing " << count << " values"); for (std::size_t i = 0; i < count; ++i) { this->addArrival(SMessage(time + static_cast<core_t::TTime>(i), "p1", TOptionalDouble()), m_Gatherer); } model->sample(time, time + bucketLength, m_ResourceMonitor); SAnnotatedProbability p; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); if (model->computeProbability(0 /*pid*/, time, time + bucketLength, partitioningFields, 1, p) == false) { continue; } LOG_DEBUG(<< "probability = " << p.s_Probability); if (*model->currentBucketCount(0, time) > 0) { probabilities.push_back(p.s_Probability); } time += bucketLength; } LOG_DEBUG(<< "probabilities = " << probabilities); BOOST_REQUIRE_EQUAL(11, probabilities.size()); BOOST_TEST_REQUIRE(probabilities[lowNonZeroCountBucket] < 0.06); BOOST_TEST_REQUIRE(probabilities[highNonZeroCountBucket] > 0.9); } BOOST_FIXTURE_TEST_CASE(testCorrelatedNoTrend, CTestFixture) { // Check we find the correct correlated variables, and identify // correlate and marginal anomalies. const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{3600}; test::CRandomNumbers rng; const std::size_t numberBuckets{200}; const TDoubleVec means{20.0, 25.0, 100.0, 800.0}; const TDoubleVecVec covariances{{3.0, 2.5, 0.0, 0.0}, {2.5, 4.0, 0.0, 0.0}, {0.0, 0.0, 100.0, -500.0}, {0.0, 0.0, -500.0, 3000.0}}; TDoubleVecVec samples; rng.generateMultivariateNormalSamples(means, covariances, numberBuckets, samples); { SModelParams params(bucketLength); params.s_DecayRate = 0.001; params.s_LearnRate = 1.0; params.s_MinimumModeFraction = CAnomalyDetectorModelConfig::DEFAULT_INDIVIDUAL_MINIMUM_MODE_FRACTION; params.s_MinimumModeCount = 24.0; params.s_MultivariateByFields = true; this->makeModel(params, {model_t::E_IndividualCountByBucketAndPerson}, startTime, 4); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); BOOST_TEST_REQUIRE(model); LOG_DEBUG(<< "Test correlation anomalies"); TSizeVec anomalyBuckets{100, 160, 190, numberBuckets}; TDoubleVecVec anomalies{{-5.73, 4.29, 0.0, 0.0}, {0.0, 0.0, 89.99, 15.38}, {-7.73, 5.59, 52.99, 9.03}}; TMinAccumulatorVec probabilities{TMinAccumulator{2}, TMinAccumulator{2}, TMinAccumulator{2}, TMinAccumulator{2}}; core_t::TTime time{startTime}; for (std::size_t i = 0, anomaly = 0; i < numberBuckets; ++i) { for (std::size_t j = 0; j < samples[i].size(); ++j) { std::string person = std::string("p") + core::CStringUtils::typeToString(j + 1); double n = samples[i][j]; if (i == anomalyBuckets[anomaly]) { n += anomalies[anomaly][j]; } for (std::size_t k = 0; k < static_cast<std::size_t>(n); ++k) { this->addArrival(SMessage(time + static_cast<core_t::TTime>(j), person, TOptionalDouble()), m_Gatherer); } } if (i == anomalyBuckets[anomaly]) { ++anomaly; } model->sample(time, time + bucketLength, m_ResourceMonitor); for (std::size_t pid = 0; pid < samples[i].size(); ++pid) { SAnnotatedProbability p; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( pid, time, time + bucketLength, partitioningFields, 1, p)); std::string correlated; if (p.s_AttributeProbabilities[0].s_CorrelatedAttributes.size() > 0 && p.s_AttributeProbabilities[0].s_CorrelatedAttributes[0] && p.s_AttributeProbabilities[0].s_Type.isUnconditional() == false) { correlated = *p.s_AttributeProbabilities[0].s_CorrelatedAttributes[0]; } probabilities[pid].add(TDoubleSizeStrTr(p.s_Probability, i, correlated)); } time += bucketLength; } TStrVec expectedResults{"[(100,p2), (190,p2)]", "[(100,p1), (190,p1)]", "[(160,p4), (190,p4)]", "[(160,p3), (190,p3)]"}; for (std::size_t i = 0; i < probabilities.size(); ++i) { LOG_DEBUG(<< "probabilities = " << probabilities[i]); std::string results[2]; for (std::size_t j = 0; j < 2; ++j) { results[j] = std::string("(") + core::CStringUtils::typeToString(probabilities[i][j].second) + "," + probabilities[i][j].third + ")"; } std::sort(results, results + 2); BOOST_REQUIRE_EQUAL(expectedResults[i], core::CContainerPrinter::print(results)); } // Test persist and restore with correlate models. std::ostringstream origJson; core::CJsonStatePersistInserter::persist( origJson, [&model](core::CJsonStatePersistInserter& inserter) { model->acceptPersistInserter(inserter); }); LOG_TRACE(<< "origJson = " << origJson.str()); LOG_DEBUG(<< "origJson size = " << origJson.str().size()); BOOST_TEST_REQUIRE(origJson.str().size() < 195000); // The traverser expects the state json in a embedded document std::istringstream origJsonStrm{"{\"topLevel\" : " + origJson.str() + "}"}; core::CJsonStateRestoreTraverser traverser(origJsonStrm); CModelFactory::TModelPtr restoredModel(m_Factory->makeModel(m_Gatherer, traverser)); std::ostringstream newJson; core::CJsonStatePersistInserter::persist( newJson, [&restoredModel](core::CJsonStatePersistInserter& inserter) { restoredModel->acceptPersistInserter(inserter); }); BOOST_REQUIRE_EQUAL(origJson.str(), newJson.str()); } { LOG_DEBUG(<< "Test marginal anomalies"); SModelParams params(bucketLength); params.s_DecayRate = 0.001; params.s_LearnRate = 1.0; params.s_MinimumModeFraction = CAnomalyDetectorModelConfig::DEFAULT_INDIVIDUAL_MINIMUM_MODE_FRACTION; params.s_MinimumModeCount = 24.0; params.s_MultivariateByFields = true; this->makeModel(params, {model_t::E_IndividualCountByBucketAndPerson}, startTime, 4); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); BOOST_TEST_REQUIRE(model); TSizeVec anomalyBuckets{100, 160, 190, numberBuckets}; TDoubleVecVec anomalies{{11.07, 14.19, 0.0, 0.0}, {0.0, 0.0, -66.9, 399.95}, {11.07, 14.19, -48.15, 329.95}}; TMinAccumulatorVec probabilities{TMinAccumulator{3}, TMinAccumulator{3}, TMinAccumulator{3}, TMinAccumulator{3}}; core_t::TTime time{startTime}; for (std::size_t i = 0, anomaly = 0; i < numberBuckets; ++i) { for (std::size_t j = 0; j < samples[i].size(); ++j) { std::string person = std::string("p") + core::CStringUtils::typeToString(j + 1); double n = samples[i][j]; if (i == anomalyBuckets[anomaly]) { n += anomalies[anomaly][j]; } n = std::max(n, 0.0); for (std::size_t k = 0; k < static_cast<std::size_t>(n); ++k) { this->addArrival(SMessage(time + static_cast<core_t::TTime>(j), person, TOptionalDouble()), m_Gatherer); } } if (i == anomalyBuckets[anomaly]) { ++anomaly; } model->sample(time, time + bucketLength, m_ResourceMonitor); for (std::size_t pid = 0; pid < samples[i].size(); ++pid) { SAnnotatedProbability p; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( pid, time, time + bucketLength, partitioningFields, 1, p)); std::string correlated; if (p.s_AttributeProbabilities[0].s_CorrelatedAttributes.size() > 0 && p.s_AttributeProbabilities[0].s_CorrelatedAttributes[0] && p.s_AttributeProbabilities[0].s_Type.isUnconditional() == false) { correlated = *p.s_AttributeProbabilities[0].s_CorrelatedAttributes[0]; } probabilities[pid].add(TDoubleSizeStrTr(p.s_Probability, i, correlated)); } time += bucketLength; } TStrVecVec expectedResults{ {"100,", "190,"}, {"100,", "190,"}, {"160,", "190,"}, {"160,", "190,"}}; for (std::size_t i = 0; i < probabilities.size(); ++i) { LOG_DEBUG(<< "probabilities = " << probabilities[i]); TStrVec results; for (const auto& result : probabilities[i]) { results.push_back(core::CStringUtils::typeToString(result.second) + "," + result.third); } for (const auto& expectedResult : expectedResults[i]) { BOOST_TEST_REQUIRE(std::find(results.begin(), results.end(), expectedResult) != results.end()); } } } } BOOST_FIXTURE_TEST_CASE(testCorrelatedTrend, CTestFixture) { // Check we find the correct correlated variables, and identify // correlate and marginal anomalies. const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{600}; test::CRandomNumbers rng; rng.discard(200000); const std::size_t numberBuckets{2880}; const TDoubleVec means{20.0, 25.0, 50.0, 100.0}; const TDoubleVecVec covariances{{20.0, 10.0, 0.0, 0.0}, {10.0, 30.0, 0.0, 0.0}, {0.0, 0.0, 40.0, -30.0}, {0.0, 0.0, -30.0, 40.0}}; const TDoubleVecVec trends{ {0.0, 0.0, 0.0, 1.0, 1.0, 2.0, 4.0, 10.0, 11.0, 10.0, 8.0, 8.0, 7.0, 9.0, 12.0, 4.0, 3.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0}, {0.0, 0.0, 0.0, 2.0, 2.0, 4.0, 8.0, 15.0, 18.0, 14.0, 12.0, 12.0, 11.0, 10.0, 16.0, 7.0, 4.0, 2.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0}, {4.0, 3.0, 5.0, 20.0, 20.0, 40.0, 80.0, 150.0, 180.0, 140.0, 120.0, 120.0, 110.0, 100.0, 160.0, 70.0, 40.0, 20.0, 10.0, 3.0, 5.0, 2.0, 1.0, 3.0}, {0.0, 0.0, 0.0, 20.0, 20.0, 40.0, 80.0, 150.0, 180.0, 140.0, 120.0, 120.0, 110.0, 100.0, 160.0, 70.0, 40.0, 40.0, 30.0, 20.0, 10.0, 0.0, 0.0, 0.0}, }; TDoubleVecVec samples; rng.generateMultivariateNormalSamples(means, covariances, numberBuckets, samples); TSizeVec anomalyBuckets{1950, 2400, 2700, numberBuckets}; TDoubleVecVec anomalies{ {-23.9, 19.7, 0.0, 0.0}, {0.0, 0.0, 36.4, 36.4}, {-28.7, 30.4, 36.4, 36.4}}; TMinAccumulatorVec probabilities{TMinAccumulator{4}, TMinAccumulator{4}, TMinAccumulator{4}, TMinAccumulator{4}}; SModelParams params(bucketLength); params.s_DecayRate = 0.0002; params.s_LearnRate = 1.0; params.s_MinimumModeFraction = CAnomalyDetectorModelConfig::DEFAULT_INDIVIDUAL_MINIMUM_MODE_FRACTION; params.s_MinimumModeCount = 24.0; params.s_MultivariateByFields = true; this->makeModel(params, {model_t::E_IndividualCountByBucketAndPerson}, startTime, 4); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); BOOST_TEST_REQUIRE(model); core_t::TTime time{startTime}; for (std::size_t i = 0, anomaly = 0; i < numberBuckets; ++i) { std::size_t hour1{static_cast<std::size_t>((time / 3600) % 24)}; std::size_t hour2{(hour1 + 1) % 24}; double dt{static_cast<double>(time % 3600) / 3600.0}; for (std::size_t j = 0; j < samples[i].size(); ++j) { std::string person{std::string("p") + core::CStringUtils::typeToString(j + 1)}; double n{std::max((1.0 - dt) * trends[j][hour1] + dt * trends[j][hour2] + samples[i][j] + (i == anomalyBuckets[anomaly] ? anomalies[anomaly][j] : 0.0), 0.0) / 3.0}; for (std::size_t k = 0; k < static_cast<std::size_t>(n); ++k) { this->addArrival(SMessage(time + static_cast<core_t::TTime>(j), person, TOptionalDouble()), m_Gatherer); } } if (i == anomalyBuckets[anomaly]) { ++anomaly; } model->sample(time, time + bucketLength, m_ResourceMonitor); for (std::size_t pid = 0; pid < samples[i].size(); ++pid) { SAnnotatedProbability p; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( pid, time, time + bucketLength, partitioningFields, 1, p)); std::string correlated; if (p.s_AttributeProbabilities[0].s_CorrelatedAttributes.size() > 0 && p.s_AttributeProbabilities[0].s_CorrelatedAttributes[0] && p.s_AttributeProbabilities[0].s_Type.isUnconditional() == false) { correlated = *p.s_AttributeProbabilities[0].s_CorrelatedAttributes[0]; } probabilities[pid].add(TDoubleSizeStrTr(p.s_Probability, i, correlated)); } time += bucketLength; } TStrVecVec expectedResults{{"1950,p2", "2700,p2"}, {"1950,p1", "2700,p1"}, {"2400,p4", "2700,p4"}, {"2400,p3", "2700,p3"}}; for (std::size_t i = 0; i < 4; ++i) { LOG_DEBUG(<< "probabilities = " << probabilities[i]); TStrVec results; for (const auto& result : probabilities[i]) { results.push_back(core::CStringUtils::typeToString(result.second) + "," + result.third); } for (const auto& expectedResult : expectedResults[i]) { BOOST_TEST_REQUIRE(std::find(results.begin(), results.end(), expectedResult) != results.end()); } } } BOOST_FIXTURE_TEST_CASE(testPrune, CTestFixture) { using TUInt64VecVec = std::vector<TUInt64Vec>; using TEventDataVec = std::vector<CEventData>; using TSizeSizeMap = std::map<std::size_t, std::size_t>; const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{3600}; const TStrVec people{"p1", "p2", "p3", "p4", "p5", "p6"}; TUInt64VecVec eventCounts; eventCounts.push_back(TUInt64Vec(1000, 0)); eventCounts[0][0] = 4; eventCounts[0][1] = 3; eventCounts[0][2] = 5; eventCounts[0][4] = 2; eventCounts.push_back(TUInt64Vec(1000, 1)); eventCounts.push_back(TUInt64Vec(1000, 0)); eventCounts[2][1] = 10; eventCounts[2][2] = 13; eventCounts[2][8] = 5; eventCounts[2][15] = 2; eventCounts.push_back(TUInt64Vec(1000, 0)); eventCounts[3][2] = 13; eventCounts[3][8] = 9; eventCounts[3][15] = 12; eventCounts.push_back(TUInt64Vec(1000, 2)); eventCounts.push_back(TUInt64Vec(1000, 1)); TSizeVec expectedPeople{1, 4, 5}; SModelParams params(bucketLength); params.s_DecayRate = 0.01; model_t::TFeatureVec features; features.push_back(model_t::E_IndividualNonZeroCountByBucketAndPerson); features.push_back(model_t::E_IndividualTotalBucketCountByPerson); CModelFactory::TDataGathererPtr gatherer; CModelFactory::TModelPtr model_; this->makeModelT<CEventRateModelFactory>( params, features, startTime, model_t::E_EventRateOnline, gatherer, model_); auto* model = dynamic_cast<CEventRateModel*>(model_.get()); BOOST_TEST_REQUIRE(model); CModelFactory::TDataGathererPtr expectedGatherer; CModelFactory::TModelPtr expectedModel_; this->makeModelT<CEventRateModelFactory>(params, features, startTime, model_t::E_EventRateOnline, expectedGatherer, expectedModel_); auto* expectedModel = dynamic_cast<CEventRateModel*>(expectedModel_.get()); BOOST_TEST_REQUIRE(expectedModel); TEventDataVec events; for (std::size_t i = 0; i < eventCounts.size(); ++i) { TTimeVec eventTimes; generateEvents(startTime, bucketLength, eventCounts[i], eventTimes); if (eventTimes.empty() == false) { std::sort(eventTimes.begin(), eventTimes.end()); std::size_t pid = this->addPerson(people[i], gatherer); for (auto time : eventTimes) { events.push_back(makeEventData(time, pid)); } } } std::sort(events.begin(), events.end(), [](const CEventData& lhs, const CEventData& rhs) { return lhs.time() < rhs.time(); }); TEventDataVec expectedEvents; expectedEvents.reserve(events.size()); TSizeSizeMap mapping; for (auto person : expectedPeople) { mapping[person] = this->addPerson(people[person], expectedGatherer); } for (const auto& event : events) { if (std::binary_search(expectedPeople.begin(), expectedPeople.end(), event.personId())) { expectedEvents.push_back( makeEventData(event.time(), mapping[*event.personId()])); } } for (auto person : expectedPeople) { this->addPerson(people[person], expectedGatherer); } core_t::TTime bucketStart = startTime; for (const auto& event : events) { while (event.time() >= bucketStart + bucketLength) { model->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor); bucketStart += bucketLength; } this->addArrival(SMessage(event.time(), gatherer->personName(*event.personId()), TOptionalDouble()), gatherer); } model->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor); size_t maxDimensionBeforePrune(model->dataGatherer().maxDimension()); model->prune(model->defaultPruneWindow()); size_t maxDimensionAfterPrune(model->dataGatherer().maxDimension()); BOOST_REQUIRE_EQUAL(maxDimensionBeforePrune, maxDimensionAfterPrune); bucketStart = maths::common::CIntegerTools::floor(expectedEvents[0].time(), bucketLength); for (const auto& event : expectedEvents) { while (event.time() >= bucketStart + bucketLength) { expectedModel->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor); bucketStart += bucketLength; } this->addArrival(SMessage(event.time(), expectedGatherer->personName(*event.personId()), TOptionalDouble()), expectedGatherer); } expectedModel->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor); LOG_DEBUG(<< "checksum = " << model->checksum()); LOG_DEBUG(<< "expected checksum = " << expectedModel->checksum()); BOOST_REQUIRE_EQUAL(expectedModel->checksum(), model->checksum()); // Now check that we recycle the person slots. bucketStart = gatherer->currentBucketStartTime() + bucketLength; TStrVec newPeople{"p7", "p8", "p9"}; for (const auto& person : newPeople) { std::size_t newPid = this->addPerson(person, gatherer); BOOST_TEST_REQUIRE(newPid < 6); std::size_t expectedNewPid = this->addPerson(person, expectedGatherer); this->addArrival(SMessage(bucketStart + 1, gatherer->personName(newPid), TOptionalDouble()), gatherer); this->addArrival(SMessage(bucketStart + 2000, gatherer->personName(newPid), TOptionalDouble()), gatherer); this->addArrival(SMessage(bucketStart + 1, expectedGatherer->personName(expectedNewPid), TOptionalDouble()), expectedGatherer); this->addArrival(SMessage(bucketStart + 2000, expectedGatherer->personName(expectedNewPid), TOptionalDouble()), expectedGatherer); } model->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor); expectedModel->sample(bucketStart, bucketStart + bucketLength, m_ResourceMonitor); LOG_DEBUG(<< "checksum = " << model->checksum()); LOG_DEBUG(<< "expected checksum = " << expectedModel->checksum()); BOOST_REQUIRE_EQUAL(expectedModel->checksum(), model->checksum()); // Test that calling prune on a cloned model which has seen no new data does nothing. CModelFactory::TModelPtr clonedModel(model->cloneForPersistence()); std::size_t numberOfPeopleBeforePrune(clonedModel->dataGatherer().numberActivePeople()); BOOST_TEST_REQUIRE(numberOfPeopleBeforePrune > 0); clonedModel->prune(clonedModel->defaultPruneWindow()); BOOST_REQUIRE_EQUAL(numberOfPeopleBeforePrune, clonedModel->dataGatherer().numberActivePeople()); } BOOST_FIXTURE_TEST_CASE(testKey, CTestFixture) { function_t::TFunctionVec countFunctions{function_t::E_IndividualCount, function_t::E_IndividualNonZeroCount, function_t::E_IndividualRareCount, function_t::E_IndividualRareNonZeroCount, function_t::E_IndividualRare, function_t::E_IndividualLowCounts, function_t::E_IndividualHighCounts}; std::string fieldName; std::string overFieldName; generateAndCompareKey(countFunctions, fieldName, overFieldName, [](CSearchKey expectedKey, CSearchKey actualKey) { BOOST_TEST_REQUIRE(expectedKey == actualKey); }); } BOOST_FIXTURE_TEST_CASE(testModelsWithValueFields, CTestFixture) { // Check that attributeConditional features are correctly // marked as such: // Create some models with attribute conditional data and // check that the values vary accordingly LOG_DEBUG(<< "*** testModelsValueFields ***"); { // check E_PopulationUniqueCountByBucketPersonAndAttribute std::size_t anomalousBucket{20}; std::size_t numberBuckets{30}; TStrVec strings{"p1", "c1", "c2"}; TSizeVecVecVec fieldsPerBucket; for (std::size_t i = 0; i < numberBuckets; i++) { TSizeVecVec fields; std::size_t attribute1Strings = 10; std::size_t attribute2Strings = 10; if (i == anomalousBucket) { attribute1Strings = 5; attribute2Strings = 15; } for (std::size_t j = 0; j < std::max(attribute1Strings, attribute2Strings); j++) { std::ostringstream ss1; std::ostringstream ss2; ss1 << "one_plus_" << i << "_" << j; ss2 << "two_plus_" << j; strings.push_back(ss1.str()); strings.push_back(ss2.str()); if (j < attribute1Strings) { TSizeVec f; f.push_back(0); f.push_back(1); f.push_back(strings.size() - 2); fields.push_back(f); } if (j < attribute2Strings) { TSizeVec f; f.push_back(0); f.push_back(2); f.push_back(strings.size() - 1); fields.push_back(f); } } fieldsPerBucket.push_back(fields); } testModelWithValueField(model_t::E_PopulationUniqueCountByBucketPersonAndAttribute, fieldsPerBucket, strings, m_ResourceMonitor); } { // Check E_PopulationInfoContentByBucketPersonAndAttribute std::size_t anomalousBucket{20}; std::size_t numberBuckets{30}; TStrVec strings{"p1", "c1", "c2", "trwh5jks9djadkn453hgfadadfjhadhfkdhakj4hkahdlagl4iuygalshkdjbvlaus4hliu4WHGFLIUSDHLKAJ", "2H4G55HALFMN569DNIVJ55B3BSJXU;4VBQ-LKDFNUE9HNV904U5QGA;DDFLVJKF95NSD,MMVASD.,A.4,A.SD4", "a", "b"}; TSizeVecVecVec fieldsPerBucket; for (std::size_t i = 0; i < numberBuckets; i++) { TSizeVecVec fields; TSizeVec fb; if (i == anomalousBucket) { // Load "c1" with "a" and "b" fields.push_back(TSizeVec()); fields.back().push_back(0); fields.back().push_back(1); fields.back().push_back(5); fields.push_back(TSizeVec()); fields.back().push_back(0); fields.back().push_back(1); fields.back().push_back(6); // Load "c2" with the random strings fields.push_back(TSizeVec()); fields.back().push_back(0); fields.back().push_back(2); fields.back().push_back(3); fields.push_back(TSizeVec()); fields.back().push_back(0); fields.back().push_back(2); fields.back().push_back(4); } else { // Load "c1" and "c2" with similarly random strings fields.push_back(TSizeVec()); fields.back().push_back(0); fields.back().push_back(1); fields.back().push_back(3); fields.push_back(TSizeVec()); fields.back().push_back(0); fields.back().push_back(1); fields.back().push_back(6); fields.push_back(TSizeVec()); fields.back().push_back(0); fields.back().push_back(2); fields.back().push_back(4); fields.push_back(TSizeVec()); fields.back().push_back(0); fields.back().push_back(2); fields.back().push_back(5); } fieldsPerBucket.push_back(fields); } testModelWithValueField(model_t::E_PopulationInfoContentByBucketPersonAndAttribute, fieldsPerBucket, strings, m_ResourceMonitor); } } BOOST_FIXTURE_TEST_CASE(testCountProbabilityCalculationWithInfluence, CTestFixture) { const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{3600}; { // Test single influence name, single influence value SModelParams params(bucketLength); params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); TStrVec influenceFieldNames{"IF1"}; factory.fieldNames("", "", "", "", influenceFieldNames); factory.features({model_t::E_IndividualCountByBucketAndPerson}); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); BOOST_REQUIRE_EQUAL(0, this->addPerson("p", gatherer, 1)); CModelFactory::TModelPtr modelHolder(factory.makeModel(gatherer)); auto* model = dynamic_cast<CEventRateModel*>(modelHolder.get()); BOOST_TEST_REQUIRE(model); // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(); expectedEventCounts.back() *= 3; generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); SAnnotatedProbability::TOptionalStrOptionalStrPrDoublePrVec lastInfluencersResult; std::size_t i{0}; std::size_t j{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, ++j) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; double count = 0.0; for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { this->addArrival(SMessage(eventTimes[i], "p", {}, {}, "inf1"), gatherer); count += 1.0; } LOG_DEBUG(<< "Bucket count = " << count); model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "influencers = " << annotatedProbability.s_Influences); BOOST_TEST_REQUIRE(annotatedProbability.s_Probability); lastInfluencersResult = annotatedProbability.s_Influences; } // All the influence should be assigned to our one influencer BOOST_REQUIRE_EQUAL(std::string("[((IF1, inf1), 1)]"), core::CContainerPrinter::print(lastInfluencersResult)); } { // Test single influence name, two influence values SModelParams params(bucketLength); params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); TStrVec influenceFieldNames{"IF1"}; factory.fieldNames("", "", "", "", influenceFieldNames); factory.features({model_t::E_IndividualCountByBucketAndPerson}); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); BOOST_REQUIRE_EQUAL(0, this->addPerson("p", gatherer, 1)); CModelFactory::TModelPtr modelHolder(factory.makeModel(gatherer)); auto* model = dynamic_cast<CEventRateModel*>(modelHolder.get()); BOOST_TEST_REQUIRE(model); // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(); expectedEventCounts.back() *= 3; generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); SAnnotatedProbability::TOptionalStrOptionalStrPrDoublePrVec lastInfluencersResult; std::size_t i{0}; std::size_t j{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, ++j) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; double count{0.0}; for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { std::stringstream ss; ss << "inf" << (i % 2); const std::string inf(ss.str()); this->addArrival(SMessage(eventTimes[i], "p", {}, {}, inf), gatherer); count += 1.0; } LOG_DEBUG(<< "Bucket count = " << count); model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "influencers = " << annotatedProbability.s_Influences); BOOST_TEST_REQUIRE(annotatedProbability.s_Probability); lastInfluencersResult = annotatedProbability.s_Influences; } // We expect equal influence since the influencers share the count. // Also the count would be fairly normal if either influencer were // removed so their influence is high. BOOST_REQUIRE_EQUAL(2, lastInfluencersResult.size()); BOOST_REQUIRE_CLOSE_ABSOLUTE(lastInfluencersResult[0].second, lastInfluencersResult[1].second, 0.05); BOOST_TEST_REQUIRE(lastInfluencersResult[0].second > 0.75); } { // Test single influence name, two influence values, low influence SModelParams params(bucketLength); params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); TStrVec influenceFieldNames{"IF1"}; factory.fieldNames("", "", "", "", influenceFieldNames); factory.features({model_t::E_IndividualCountByBucketAndPerson}); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); BOOST_REQUIRE_EQUAL(0, this->addPerson("p", gatherer, 1)); CModelFactory::TModelPtr modelHolder(factory.makeModel(gatherer)); auto* model = dynamic_cast<CEventRateModel*>(modelHolder.get()); BOOST_TEST_REQUIRE(model); // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(); expectedEventCounts.back() *= 6; generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); SAnnotatedProbability::TOptionalStrOptionalStrPrDoublePrVec lastInfluencersResult; std::size_t i{0}; std::size_t j{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, ++j) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; double count{0.0}; for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { std::stringstream ss; ss << "inf" << (i % 2); const std::string inf(ss.str()); this->addArrival(SMessage(eventTimes[i], "p", {}, {}, inf), gatherer); count += 1.0; } LOG_DEBUG(<< "Bucket count = " << count); model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "influencers = " << annotatedProbability.s_Influences); BOOST_TEST_REQUIRE(annotatedProbability.s_Probability); lastInfluencersResult = annotatedProbability.s_Influences; } // We expect equal influence since the influencers share the count. // However, the bucket is still significantly anomalous omitting // the records from either influencer so their influence is smaller. BOOST_REQUIRE_EQUAL(2, lastInfluencersResult.size()); BOOST_REQUIRE_CLOSE_ABSOLUTE(lastInfluencersResult[0].second, lastInfluencersResult[1].second, 0.05); BOOST_TEST_REQUIRE(lastInfluencersResult[0].second > 0.5); BOOST_TEST_REQUIRE(lastInfluencersResult[0].second < 0.6); } { // Test single influence name, two asymmetric influence values SModelParams params(bucketLength); params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); TStrVec influenceFieldNames{"IF1"}; factory.fieldNames("", "", "", "", influenceFieldNames); factory.features({model_t::E_IndividualCountByBucketAndPerson}); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); BOOST_REQUIRE_EQUAL(0, this->addPerson("p", gatherer, 1)); CModelFactory::TModelPtr modelHolder(factory.makeModel(gatherer)); auto* model = dynamic_cast<CEventRateModel*>(modelHolder.get()); BOOST_TEST_REQUIRE(model); // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(); expectedEventCounts.back() *= 3; generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); SAnnotatedProbability::TOptionalStrOptionalStrPrDoublePrVec lastInfluencersResult; std::size_t i{0}; std::size_t j{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, ++j) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; double count{0.0}; for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { std::stringstream ss; ss << "inf"; if (i % 10 == 0) { ss << "_extra"; } const std::string inf(ss.str()); this->addArrival(SMessage(eventTimes[i], "p", {}, {}, inf), gatherer); count += 1.0; } LOG_DEBUG(<< "Bucket count = " << count); model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "influencers = " << annotatedProbability.s_Influences); BOOST_TEST_REQUIRE(annotatedProbability.s_Probability); lastInfluencersResult = annotatedProbability.s_Influences; } // The influence should be dominated by the first influencer, and the // _extra influencers should be dropped by the cutoff threshold BOOST_REQUIRE_EQUAL(1, lastInfluencersResult.size()); BOOST_TEST_REQUIRE(lastInfluencersResult[0].second > 0.99); } { // Test two influence names, two asymmetric influence values in each SModelParams params(bucketLength); params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); TStrVec influenceFieldNames{"IF1", "IF2"}; factory.fieldNames("", "", "", "", influenceFieldNames); factory.features({model_t::E_IndividualCountByBucketAndPerson}); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); BOOST_REQUIRE_EQUAL(0, this->addPerson("p", gatherer, 2)); CModelFactory::TModelPtr modelHolder(factory.makeModel(gatherer)); auto* model = dynamic_cast<CEventRateModel*>(modelHolder.get()); BOOST_TEST_REQUIRE(model); // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(); expectedEventCounts.back() *= 3; generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); SAnnotatedProbability::TOptionalStrOptionalStrPrDoublePrVec lastInfluencersResult; std::size_t i{0}; std::size_t j{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, ++j) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; double count{0.0}; for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { std::stringstream ss; ss << "inf"; if (i % 10 == 0) { ss << "_extra"; } const std::string inf1(ss.str()); ss << "_another"; const std::string inf2(ss.str()); this->addArrival(SMessage(eventTimes[i], "p", {}, {}, inf1, inf2), gatherer); count += 1.0; } LOG_DEBUG(<< "Bucket count = " << count); model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "influencers = " << annotatedProbability.s_Influences); BOOST_TEST_REQUIRE(annotatedProbability.s_Probability); lastInfluencersResult = annotatedProbability.s_Influences; } // The influence should be dominated by the first influencer for both fields, // and the _extra influencers should be dropped by the cutoff threshold BOOST_REQUIRE_EQUAL(std::string("IF2"), *lastInfluencersResult[0].first.first); BOOST_REQUIRE_EQUAL(std::string("inf_another"), *lastInfluencersResult[0].first.second); BOOST_REQUIRE_EQUAL(2, lastInfluencersResult.size()); BOOST_REQUIRE_EQUAL(std::string("IF1"), *lastInfluencersResult[1].first.first); BOOST_REQUIRE_EQUAL(std::string("inf"), *lastInfluencersResult[1].first.second); BOOST_REQUIRE_EQUAL(lastInfluencersResult[0].second, lastInfluencersResult[1].second); BOOST_TEST_REQUIRE(lastInfluencersResult[0].second > 0.99); } { // The influencer is one of the partitioning fields. SModelParams params(bucketLength); params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); std::string byFieldName{"P"}; factory.fieldNames("", "", byFieldName, "", {byFieldName}); factory.features({model_t::E_IndividualCountByBucketAndPerson}); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); BOOST_REQUIRE_EQUAL(0, this->addPerson("p", gatherer, 1)); CModelFactory::TModelPtr modelHolder(factory.makeModel(gatherer)); auto* model = dynamic_cast<CEventRateModel*>(modelHolder.get()); BOOST_TEST_REQUIRE(model); // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(); expectedEventCounts.back() *= 3; generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); SAnnotatedProbability::TOptionalStrOptionalStrPrDoublePrVec lastInfluencersResult; core_t::TTime bucketStartTime = startTime; core_t::TTime bucketEndTime = startTime + bucketLength; for (std::size_t i = 0, j = 0; bucketStartTime < endTime; bucketStartTime += bucketLength, bucketEndTime += bucketLength, ++j) { for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { this->addArrival(SMessage(eventTimes[i], "p", {}, {}, "p"), gatherer); } model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); } // Check we still have influences for an empty bucket. model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); partitioningFields.add(byFieldName, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability(0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "influencers = " << annotatedProbability.s_Influences); BOOST_REQUIRE_EQUAL(false, annotatedProbability.s_Influences.empty()); } } BOOST_FIXTURE_TEST_CASE(testDistinctCountProbabilityCalculationWithInfluence, CTestFixture) { const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{3600}; { // Test single influence name, single influence value SModelParams params(bucketLength); params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); TStrVec influenceFieldNames{"IF1"}; factory.fieldNames("", "", "", "foo", influenceFieldNames); factory.features({model_t::E_IndividualUniqueCountByBucketAndPerson}); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); BOOST_REQUIRE_EQUAL(0, this->addPerson("p", gatherer, 1, "v")); CModelFactory::TModelPtr modelHolder(factory.makeModel(gatherer)); auto* model = dynamic_cast<CEventRateModel*>(modelHolder.get()); BOOST_TEST_REQUIRE(model); const std::string uniqueValue("str_value"); // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(); generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); SAnnotatedProbability::TOptionalStrOptionalStrPrDoublePrVec lastInfluencersResult; std::size_t i{0}; std::size_t j{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, ++j) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; double count{0.0}; for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { this->addArrival(SMessage(eventTimes[i], "p", {}, {}, "inf1", uniqueValue), gatherer); count += 1.0; } if (i == eventTimes.size()) { // Generate anomaly LOG_DEBUG(<< "Generating anomaly"); for (std::size_t k = 0; k < 20; k++) { std::stringstream ss; ss << uniqueValue << "_" << k; this->addArrival( SMessage(eventTimes[i - 1], "p", {}, {}, "inf1", ss.str()), gatherer); } } LOG_DEBUG(<< "Bucket count = " << count); model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "influencers = " << annotatedProbability.s_Influences); BOOST_TEST_REQUIRE(annotatedProbability.s_Probability); lastInfluencersResult = annotatedProbability.s_Influences; } // All the influence should be assigned to our one influencer BOOST_REQUIRE_EQUAL(std::string("[((IF1, inf1), 1)]"), core::CContainerPrinter::print(lastInfluencersResult)); } { // Test single influence name, two influence values SModelParams params(bucketLength); params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); TStrVec influenceFieldNames{"IF1"}; factory.fieldNames("", "", "", "foo", influenceFieldNames); factory.features({model_t::E_IndividualUniqueCountByBucketAndPerson}); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); BOOST_REQUIRE_EQUAL(0, this->addPerson("p", gatherer, 1, "v")); CModelFactory::TModelPtr modelHolder(factory.makeModel(gatherer)); auto* model = dynamic_cast<CEventRateModel*>(modelHolder.get()); BOOST_TEST_REQUIRE(model); const std::string uniqueValue("str_value"); // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(); generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); SAnnotatedProbability::TOptionalStrOptionalStrPrDoublePrVec lastInfluencersResult; std::size_t i{0}; std::size_t j{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, ++j) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; double count{0.0}; for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { this->addArrival(SMessage(eventTimes[i], "p", {}, {}, "inf1", uniqueValue), gatherer); count += 1.0; } if (i == eventTimes.size()) { // Generate anomaly LOG_DEBUG(<< "Generating anomaly"); for (std::size_t k = 1; k < 20; k++) { std::stringstream ss; ss << uniqueValue << "_" << k; CEventData d = makeEventData(eventTimes[i - 1], 0, {}, ss.str()); if (k % 2 == 0) { this->addArrival(SMessage(eventTimes[i - 1], "p", {}, {}, "inf1", ss.str()), gatherer); } else { this->addArrival(SMessage(eventTimes[i - 1], "p", {}, {}, "inf2", ss.str()), gatherer); } } } LOG_DEBUG(<< "Bucket count = " << count); model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "influencers = " << annotatedProbability.s_Influences); BOOST_TEST_REQUIRE(annotatedProbability.s_Probability); lastInfluencersResult = annotatedProbability.s_Influences; } // The influence should be shared by the two influencers, and as the anomaly // is about twice the regular count, each influencer contributes a lot to // the anomaly BOOST_REQUIRE_EQUAL(2, lastInfluencersResult.size()); BOOST_REQUIRE_CLOSE_ABSOLUTE(lastInfluencersResult[0].second, lastInfluencersResult[1].second, 0.05); BOOST_TEST_REQUIRE(lastInfluencersResult[0].second > 0.6); } { // Test single influence name, two asymmetric influence values SModelParams params(bucketLength); params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); TStrVec influenceFieldNames{"IF1"}; factory.fieldNames("", "", "", "foo", influenceFieldNames); factory.features({model_t::E_IndividualUniqueCountByBucketAndPerson}); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); BOOST_REQUIRE_EQUAL(0, this->addPerson("p", gatherer, 1, "v")); CModelFactory::TModelPtr modelHolder(factory.makeModel(gatherer)); auto* model = dynamic_cast<CEventRateModel*>(modelHolder.get()); BOOST_TEST_REQUIRE(model); const std::string uniqueValue("str_value"); // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(); generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); SAnnotatedProbability::TOptionalStrOptionalStrPrDoublePrVec lastInfluencersResult; std::size_t i{0}; std::size_t j{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, ++j) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; double count{0.0}; for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { this->addArrival(SMessage(eventTimes[i], "p", {}, {}, "inf1", uniqueValue), gatherer); count += 1.0; } if (i == eventTimes.size()) { // Generate anomaly LOG_DEBUG(<< "Generating anomaly"); for (std::size_t k = 1; k < 20; k++) { std::stringstream ss; ss << uniqueValue << "_" << k; if (k == 1) { this->addArrival(SMessage(eventTimes[i - 1], "p", {}, {}, "inf2", ss.str()), gatherer); } else { this->addArrival(SMessage(eventTimes[i - 1], "p", {}, {}, "inf1", ss.str()), gatherer); } } } LOG_DEBUG(<< "Bucket count = " << count); model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "influencers = " << annotatedProbability.s_Influences); BOOST_TEST_REQUIRE(annotatedProbability.s_Probability); lastInfluencersResult = annotatedProbability.s_Influences; } // The influence should be dominated by the first influencer, and the // _extra influencer should be dropped by the cutoff threshold BOOST_REQUIRE_EQUAL(1, lastInfluencersResult.size()); BOOST_TEST_REQUIRE(lastInfluencersResult[0].second > 0.8); } { // Test two influence names, two asymmetric influence values in each SModelParams params(bucketLength); params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); TStrVec influenceFieldNames{"IF1", "IF2"}; factory.fieldNames("", "", "", "foo", influenceFieldNames); factory.features({model_t::E_IndividualUniqueCountByBucketAndPerson}); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); BOOST_REQUIRE_EQUAL(0, this->addPerson("p", gatherer, 2, "v")); CModelFactory::TModelPtr modelHolder(factory.makeModel(gatherer)); auto* model = dynamic_cast<CEventRateModel*>(modelHolder.get()); BOOST_TEST_REQUIRE(model); const std::string uniqueValue("str_value"); // Generate some events. TTimeVec eventTimes; TUInt64Vec expectedEventCounts = rawEventCounts(); generateEvents(startTime, bucketLength, expectedEventCounts, eventTimes); core_t::TTime endTime = (eventTimes.back() / bucketLength + 1) * bucketLength; LOG_DEBUG(<< "startTime = " << startTime << ", endTime = " << endTime << ", # events = " << eventTimes.size()); SAnnotatedProbability::TOptionalStrOptionalStrPrDoublePrVec lastInfluencersResult; std::size_t i{0}; std::size_t j{0}; for (core_t::TTime bucketStartTime = startTime; bucketStartTime < endTime; bucketStartTime += bucketLength, ++j) { core_t::TTime bucketEndTime = bucketStartTime + bucketLength; double count{0.0}; for (; i < eventTimes.size() && eventTimes[i] < bucketEndTime; ++i) { this->addArrival(SMessage(eventTimes[i], "p", {}, {}, "inf1", "inf1", uniqueValue), gatherer); count += 1.0; } if (i == eventTimes.size()) { // Generate anomaly LOG_DEBUG(<< "Generating anomaly"); for (std::size_t k = 1; k < 22; k++) { std::stringstream ss1; ss1 << uniqueValue << "_" << k; std::stringstream ss2; ss2 << "inf"; if (i % 10 == 0) { ss2 << "_extra"; } const std::string inf1(ss2.str()); ss2 << "_another"; const std::string inf2(ss2.str()); LOG_DEBUG(<< "Inf1 = " << inf1); LOG_DEBUG(<< "Inf2 = " << inf2); LOG_DEBUG(<< "Value = " << ss1.str()); this->addArrival(SMessage(eventTimes[i - 1], "p", {}, {}, inf1, inf2, ss1.str()), gatherer); } } LOG_DEBUG(<< "Bucket count = " << count); model->sample(bucketStartTime, bucketEndTime, m_ResourceMonitor); SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, bucketStartTime, bucketEndTime, partitioningFields, 1, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "influencers = " << annotatedProbability.s_Influences); BOOST_TEST_REQUIRE(annotatedProbability.s_Probability); lastInfluencersResult = annotatedProbability.s_Influences; } // The influence should be dominated by the first influencer for both fields, and the // _extra influencers should be dropped by the cutoff threshold BOOST_REQUIRE_EQUAL(2, lastInfluencersResult.size()); BOOST_REQUIRE_EQUAL(std::string("IF2"), *lastInfluencersResult[0].first.first); BOOST_REQUIRE_EQUAL(std::string("inf_another"), *lastInfluencersResult[0].first.second); BOOST_REQUIRE_EQUAL(std::string("IF1"), *lastInfluencersResult[1].first.first); BOOST_REQUIRE_EQUAL(std::string("inf"), *lastInfluencersResult[1].first.second); BOOST_REQUIRE_EQUAL(lastInfluencersResult[0].second, lastInfluencersResult[1].second); BOOST_TEST_REQUIRE(lastInfluencersResult[1].second > 0.8); } } BOOST_FIXTURE_TEST_CASE(testRareWithInfluence, CTestFixture) { const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{3600}; SModelParams params(bucketLength); auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); TStrVec influenceFieldNames{"IF1"}; factory.fieldNames("", "", "", "", influenceFieldNames); factory.features(function_t::features(function_t::E_IndividualRare)); CModelFactory::TDataGathererPtr gatherer(factory.makeDataGatherer(startTime)); BOOST_REQUIRE_EQUAL(0, this->addPerson("p1", gatherer, 1)); BOOST_REQUIRE_EQUAL(1, this->addPerson("p2", gatherer, 1)); BOOST_REQUIRE_EQUAL(2, this->addPerson("p3", gatherer, 1)); BOOST_REQUIRE_EQUAL(3, this->addPerson("p4", gatherer, 1)); BOOST_REQUIRE_EQUAL(4, this->addPerson("p5", gatherer, 1)); CModelFactory::TModelPtr modelHolder(factory.makeModel(gatherer)); auto* model = dynamic_cast<CEventRateModel*>(modelHolder.get()); BOOST_TEST_REQUIRE(model); SAnnotatedProbability::TOptionalStrOptionalStrPrDoublePrVec lastInfluencersResult; core_t::TTime time{startTime}; for (/**/; time < startTime + 50 * bucketLength; time += bucketLength) { this->addArrival(SMessage(time + bucketLength / 2, "p1", {}, {}, "inf1"), gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p2", {}, {}, "inf1"), gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p3", {}, {}, "inf1"), gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p4", {}, {}, "inf1"), gatherer); model->sample(time, time + bucketLength, m_ResourceMonitor); } { this->addArrival(SMessage(time + bucketLength / 2, "p1", {}, {}, "inf1"), gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p2", {}, {}, "inf1"), gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p3", {}, {}, "inf1"), gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p4", {}, {}, "inf1"), gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p5", {}, {}, "inf2"), gatherer); } model->sample(time, time + bucketLength, m_ResourceMonitor); TDoubleVec probabilities; for (std::size_t pid = 0; pid < 5; ++pid) { SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( pid, time, time + bucketLength, partitioningFields, 1, annotatedProbability)); LOG_DEBUG(<< "probability = " << annotatedProbability.s_Probability); LOG_DEBUG(<< "influencers = " << annotatedProbability.s_Influences); lastInfluencersResult = annotatedProbability.s_Influences; probabilities.push_back(annotatedProbability.s_Probability); } // We expect "p1 = p2 = p3 = p4 >> p5". BOOST_REQUIRE_EQUAL(5, probabilities.size()); BOOST_REQUIRE_EQUAL(probabilities[0], probabilities[1]); BOOST_REQUIRE_EQUAL(probabilities[1], probabilities[2]); BOOST_REQUIRE_EQUAL(probabilities[2], probabilities[3]); BOOST_TEST_REQUIRE(probabilities[3] > 50.0 * probabilities[4]); // Expect the influence for this anomaly to be "INF1":"inf2" LOG_DEBUG(<< lastInfluencersResult); BOOST_REQUIRE_EQUAL(1, lastInfluencersResult.size()); BOOST_TEST_REQUIRE(lastInfluencersResult[0].second > 0.75); BOOST_REQUIRE_EQUAL(std::string("IF1"), *lastInfluencersResult[0].first.first); BOOST_REQUIRE_EQUAL(std::string("inf2"), *lastInfluencersResult[0].first.second); std::ostringstream origJson; core::CJsonStatePersistInserter::persist( origJson, [&model](core::CJsonStatePersistInserter& inserter) { model->acceptPersistInserter(inserter); }); // Restore the Json into a new filter // The traverser expects the state json in a embedded document std::istringstream origJsonStrm{"{\"topLevel\" : " + origJson.str() + "}"}; core::CJsonStateRestoreTraverser traverser(origJsonStrm); CModelFactory::TModelPtr restoredModelPtr(factory.makeModel(gatherer, traverser)); // The Json representation of the new filter should be the same as the original std::ostringstream newJson; core::CJsonStatePersistInserter::persist( newJson, [&restoredModelPtr](core::CJsonStatePersistInserter& inserter) { restoredModelPtr->acceptPersistInserter(inserter); }); BOOST_REQUIRE_EQUAL(origJson.str(), newJson.str()); } BOOST_FIXTURE_TEST_CASE(testSkipSampling, CTestFixture) { core_t::TTime startTime{100}; std::size_t bucketLength{100}; std::size_t maxAgeBuckets{5}; SModelParams params(bucketLength); params.s_InitialDecayRateMultiplier = 1.0; model_t::EFeature feature{model_t::E_IndividualCountByBucketAndPerson}; model_t::TFeatureVec features{feature}; CModelFactory::TDataGathererPtr gathererNoGap; CModelFactory::TModelPtr modelNoGap_; this->makeModelT<CEventRateModelFactory>(params, features, startTime, model_t::E_EventRateOnline, gathererNoGap, modelNoGap_); auto* modelNoGap = dynamic_cast<CEventRateModel*>(modelNoGap_.get()); for (std::size_t i = 0; i < 2; ++i) { BOOST_REQUIRE_EQUAL( i, this->addPerson("p" + core::CStringUtils::typeToString(i + 1), gathererNoGap)); } // p1: |1|1|1| // p2: |1|0|0| this->addArrival(SMessage(100, "p1", TOptionalDouble()), gathererNoGap); this->addArrival(SMessage(100, "p2", TOptionalDouble()), gathererNoGap); modelNoGap->sample(100, 200, m_ResourceMonitor); this->addArrival(SMessage(200, "p1", TOptionalDouble()), gathererNoGap); modelNoGap->sample(200, 300, m_ResourceMonitor); this->addArrival(SMessage(300, "p1", TOptionalDouble()), gathererNoGap); modelNoGap->sample(300, 400, m_ResourceMonitor); CModelFactory::TDataGathererPtr gathererWithGap; CModelFactory::TModelPtr modelWithGap_; this->makeModelT<CEventRateModelFactory>(params, features, startTime, model_t::E_EventRateOnline, gathererWithGap, modelWithGap_); auto* modelWithGap = dynamic_cast<CEventRateModel*>(modelWithGap_.get()); for (std::size_t i = 0; i < 2; ++i) { BOOST_REQUIRE_EQUAL( i, this->addPerson("p" + core::CStringUtils::typeToString(i + 1), gathererWithGap)); } // p1: |1|1|0|0|0|0|0|0|0|0|1|1| // p1: |1|X|X|X|X|X|X|X|X|X|1|1| -> equal to |1|1|1| // p2: |1|1|0|0|0|0|0|0|0|0|0|0| // p2: |1|X|X|X|X|X|X|X|X|X|0|0| -> equal to |1|0|0| // where X means skipped bucket this->addArrival(SMessage(100, "p1", TOptionalDouble()), gathererWithGap); this->addArrival(SMessage(100, "p2", TOptionalDouble()), gathererWithGap); modelWithGap->sample(100, 200, m_ResourceMonitor); this->addArrival(SMessage(200, "p1", TOptionalDouble()), gathererWithGap); this->addArrival(SMessage(200, "p2", TOptionalDouble()), gathererWithGap); modelWithGap->skipSampling(1000); LOG_DEBUG(<< "Calling sample over skipped interval should do nothing except print some ERRORs"); modelWithGap->sample(200, 1000, m_ResourceMonitor); // Check prune does not remove people because last seen times are updated by adding gap duration modelWithGap->prune(maxAgeBuckets); BOOST_REQUIRE_EQUAL(2, gathererWithGap->numberActivePeople()); this->addArrival(SMessage(1000, "p1", TOptionalDouble()), gathererWithGap); modelWithGap->sample(1000, 1100, m_ResourceMonitor); this->addArrival(SMessage(1100, "p1", TOptionalDouble()), gathererWithGap); modelWithGap->sample(1100, 1200, m_ResourceMonitor); // Check priors are the same BOOST_REQUIRE_EQUAL( static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelWithGap->details()->model(feature, 0)) ->residualModel() .checksum(), static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelNoGap->details()->model(feature, 0)) ->residualModel() .checksum()); BOOST_REQUIRE_EQUAL( static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelWithGap->details()->model(feature, 1)) ->residualModel() .checksum(), static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelNoGap->details()->model(feature, 1)) ->residualModel() .checksum()); // Confirm last seen times are only updated by gap duration by forcing p2 to be pruned modelWithGap->sample(1200, 1500, m_ResourceMonitor); modelWithGap->prune(maxAgeBuckets); // Age at this point will be 500 and since it's equal to maxAge it should still be here BOOST_REQUIRE_EQUAL(2, gathererWithGap->numberActivePeople()); modelWithGap->sample(1500, 1600, m_ResourceMonitor); modelWithGap->prune(maxAgeBuckets); // Age at this point will be 600 so it should get pruned BOOST_REQUIRE_EQUAL(1, gathererWithGap->numberActivePeople()); } BOOST_FIXTURE_TEST_CASE(testExplicitNulls, CTestFixture) { core_t::TTime startTime{100}; std::size_t bucketLength{100}; std::string summaryCountField{"count"}; SModelParams params(bucketLength); params.s_InitialDecayRateMultiplier = 1.0; model_t::EFeature feature{model_t::E_IndividualNonZeroCountByBucketAndPerson}; model_t::TFeatureVec features{feature}; CModelFactory::TDataGathererPtr gathererSkipGap; CModelFactory::TModelPtr modelSkipGap_; this->makeModelT<CEventRateModelFactory>(params, features, startTime, model_t::E_EventRateOnline, gathererSkipGap, modelSkipGap_, {}, summaryCountField); auto* modelSkipGap = dynamic_cast<CEventRateModel*>(modelSkipGap_.get()); // The idea here is to compare a model that has a gap skipped against a model // that has explicit nulls for the buckets that sampling was skipped. // p1: |1|1|1|X|X|1| // p2: |1|1|0|X|X|0| this->addArrival(SMessage(100, "p1", {}, {}, std::nullopt, std::nullopt, "1"), gathererSkipGap); this->addArrival(SMessage(100, "p2", {}, {}, std::nullopt, std::nullopt, "1"), gathererSkipGap); modelSkipGap->sample(100, 200, m_ResourceMonitor); this->addArrival(SMessage(200, "p1", {}, {}, std::nullopt, std::nullopt, "1"), gathererSkipGap); this->addArrival(SMessage(200, "p2", {}, {}, std::nullopt, std::nullopt, "1"), gathererSkipGap); modelSkipGap->sample(200, 300, m_ResourceMonitor); this->addArrival(SMessage(300, "p1", {}, {}, std::nullopt, std::nullopt, "1"), gathererSkipGap); modelSkipGap->sample(300, 400, m_ResourceMonitor); modelSkipGap->skipSampling(600); this->addArrival(SMessage(600, "p1", {}, {}, std::nullopt, std::nullopt, "1"), gathererSkipGap); modelSkipGap->sample(600, 700, m_ResourceMonitor); CModelFactory::TDataGathererPtr gathererExNull; CModelFactory::TModelPtr modelExNullGap_; this->makeModelT<CEventRateModelFactory>(params, features, startTime, model_t::E_EventRateOnline, gathererExNull, modelExNullGap_, {}, summaryCountField); auto* modelExNullGap = dynamic_cast<CEventRateModel*>(modelExNullGap_.get()); // p1: |1,"",null|1|1|null|null|1| // p2: |1,""|1|0|null|null|0| this->addArrival(SMessage(100, "p1", {}, {}, std::nullopt, std::nullopt, "1"), gathererExNull); this->addArrival(SMessage(100, "p1", {}, {}, std::nullopt, std::nullopt, ""), gathererExNull); this->addArrival(SMessage(100, "p1", {}, {}, std::nullopt, std::nullopt, "null"), gathererExNull); this->addArrival(SMessage(100, "p2", {}, {}, std::nullopt, std::nullopt, "1"), gathererExNull); this->addArrival(SMessage(100, "p2", {}, {}, std::nullopt, std::nullopt, ""), gathererExNull); modelExNullGap->sample(100, 200, m_ResourceMonitor); this->addArrival(SMessage(200, "p1", {}, {}, std::nullopt, std::nullopt, "1"), gathererExNull); this->addArrival(SMessage(200, "p2", {}, {}, std::nullopt, std::nullopt, "1"), gathererExNull); modelExNullGap->sample(200, 300, m_ResourceMonitor); this->addArrival(SMessage(300, "p1", {}, {}, std::nullopt, std::nullopt, "1"), gathererExNull); modelExNullGap->sample(300, 400, m_ResourceMonitor); this->addArrival(SMessage(400, "p1", {}, {}, std::nullopt, std::nullopt, "null"), gathererExNull); this->addArrival(SMessage(400, "p2", {}, {}, std::nullopt, std::nullopt, "null"), gathererExNull); modelExNullGap->sample(400, 500, m_ResourceMonitor); this->addArrival(SMessage(500, "p1", {}, {}, std::nullopt, std::nullopt, "null"), gathererExNull); this->addArrival(SMessage(500, "p2", {}, {}, std::nullopt, std::nullopt, "null"), gathererExNull); modelExNullGap->sample(500, 600, m_ResourceMonitor); this->addArrival(SMessage(600, "p1", {}, {}, std::nullopt, std::nullopt, "1"), gathererExNull); modelExNullGap->sample(600, 700, m_ResourceMonitor); // Check priors are the same BOOST_REQUIRE_EQUAL( static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelExNullGap->details()->model(feature, 0)) ->residualModel() .checksum(), static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelSkipGap->details()->model(feature, 0)) ->residualModel() .checksum()); BOOST_REQUIRE_EQUAL( static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelExNullGap->details()->model(feature, 1)) ->residualModel() .checksum(), static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelSkipGap->details()->model(feature, 1)) ->residualModel() .checksum()); } BOOST_FIXTURE_TEST_CASE(testInterimCorrections, CTestFixture) { core_t::TTime startTime{3600}; core_t::TTime bucketLength{3600}; core_t::TTime endTime{2 * 24 * bucketLength}; SModelParams params(bucketLength); params.s_InitialDecayRateMultiplier = 1.0; params.s_MultibucketFeaturesWindowLength = 0; this->makeModel(params, {model_t::E_IndividualCountByBucketAndPerson}, startTime, 3); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); CCountingModel countingModel(params, m_Gatherer, m_InterimBucketCorrector); test::CRandomNumbers rng; core_t::TTime now{startTime}; TDoubleVec samples(3, 0.0); while (now < endTime) { rng.generateUniformSamples(50.0, 70.0, 3, samples); for (std::size_t i = 0; i < static_cast<std::size_t>(samples[0] + 0.5); ++i) { this->addArrival(SMessage(now, "p1", TOptionalDouble()), m_Gatherer); } for (std::size_t i = 0; i < static_cast<std::size_t>(samples[1] + 0.5); ++i) { this->addArrival(SMessage(now, "p2", TOptionalDouble()), m_Gatherer); } for (std::size_t i = 0; i < static_cast<std::size_t>(samples[2] + 0.5); ++i) { this->addArrival(SMessage(now, "p3", TOptionalDouble()), m_Gatherer); } countingModel.sample(now, now + bucketLength, m_ResourceMonitor); model->sample(now, now + bucketLength, m_ResourceMonitor); now += bucketLength; } for (std::size_t i = 0; i < 35; ++i) { this->addArrival(SMessage(now, "p1", TOptionalDouble()), m_Gatherer); } for (std::size_t i = 0; i < 1; ++i) { this->addArrival(SMessage(now, "p2", TOptionalDouble()), m_Gatherer); } for (std::size_t i = 0; i < 100; ++i) { this->addArrival(SMessage(now, "p3", TOptionalDouble()), m_Gatherer); } countingModel.sampleBucketStatistics(now, now + bucketLength, m_ResourceMonitor); model->sampleBucketStatistics(now, now + bucketLength, m_ResourceMonitor); CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); model_t::CResultType type(model_t::CResultType::E_Unconditional | model_t::CResultType::E_Interim); SAnnotatedProbability annotatedProbability1; annotatedProbability1.s_ResultType = type; BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, now, now + bucketLength, partitioningFields, 1, annotatedProbability1)); SAnnotatedProbability annotatedProbability2; annotatedProbability2.s_ResultType = type; BOOST_TEST_REQUIRE(model->computeProbability( 1 /*pid*/, now, now + bucketLength, partitioningFields, 1, annotatedProbability2)); SAnnotatedProbability annotatedProbability3; annotatedProbability3.s_ResultType = type; BOOST_TEST_REQUIRE(model->computeProbability( 2 /*pid*/, now, now + bucketLength, partitioningFields, 1, annotatedProbability3)); TDouble1Vec p1Baseline = model->baselineBucketMean( model_t::E_IndividualCountByBucketAndPerson, 0, 0, type, NO_CORRELATES, now); TDouble1Vec p2Baseline = model->baselineBucketMean( model_t::E_IndividualCountByBucketAndPerson, 1, 0, type, NO_CORRELATES, now); TDouble1Vec p3Baseline = model->baselineBucketMean( model_t::E_IndividualCountByBucketAndPerson, 2, 0, type, NO_CORRELATES, now); LOG_DEBUG(<< "p1 probability = " << annotatedProbability1.s_Probability); LOG_DEBUG(<< "p2 probability = " << annotatedProbability2.s_Probability); LOG_DEBUG(<< "p3 probability = " << annotatedProbability3.s_Probability); LOG_DEBUG(<< "p1 baseline = " << p1Baseline[0]); LOG_DEBUG(<< "p2 baseline = " << p2Baseline[0]); LOG_DEBUG(<< "p3 baseline = " << p3Baseline[0]); BOOST_TEST_REQUIRE(annotatedProbability1.s_Probability > 0.05); BOOST_TEST_REQUIRE(annotatedProbability2.s_Probability < 0.05); BOOST_TEST_REQUIRE(annotatedProbability3.s_Probability < 0.05); BOOST_TEST_REQUIRE(p1Baseline[0] > 44.0); BOOST_TEST_REQUIRE(p1Baseline[0] < 46.0); BOOST_TEST_REQUIRE(p2Baseline[0] > 43.0); BOOST_TEST_REQUIRE(p2Baseline[0] < 47.0); BOOST_TEST_REQUIRE(p3Baseline[0] > 57.0); BOOST_TEST_REQUIRE(p3Baseline[0] < 62.0); for (std::size_t i = 0; i < 25; ++i) { this->addArrival(SMessage(now, "p1", TOptionalDouble()), m_Gatherer); } for (std::size_t i = 0; i < 59; ++i) { this->addArrival(SMessage(now, "p2", TOptionalDouble()), m_Gatherer); } for (std::size_t i = 0; i < 100; ++i) { this->addArrival(SMessage(now, "p3", TOptionalDouble()), m_Gatherer); } countingModel.sampleBucketStatistics(now, now + bucketLength, m_ResourceMonitor); model->sampleBucketStatistics(now, now + bucketLength, m_ResourceMonitor); BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, now, now + bucketLength, partitioningFields, 0, annotatedProbability1)); BOOST_TEST_REQUIRE(model->computeProbability( 1 /*pid*/, now, now + bucketLength, partitioningFields, 0, annotatedProbability2)); BOOST_TEST_REQUIRE(model->computeProbability( 2 /*pid*/, now, now + bucketLength, partitioningFields, 0, annotatedProbability3)); p1Baseline = model->baselineBucketMean(model_t::E_IndividualCountByBucketAndPerson, 0, 0, type, NO_CORRELATES, now); p2Baseline = model->baselineBucketMean(model_t::E_IndividualCountByBucketAndPerson, 1, 0, type, NO_CORRELATES, now); p3Baseline = model->baselineBucketMean(model_t::E_IndividualCountByBucketAndPerson, 2, 0, type, NO_CORRELATES, now); LOG_DEBUG(<< "p1 probability = " << annotatedProbability1.s_Probability); LOG_DEBUG(<< "p2 probability = " << annotatedProbability2.s_Probability); LOG_DEBUG(<< "p3 probability = " << annotatedProbability3.s_Probability); LOG_DEBUG(<< "p1 baseline = " << p1Baseline[0]); LOG_DEBUG(<< "p2 baseline = " << p2Baseline[0]); LOG_DEBUG(<< "p3 baseline = " << p3Baseline[0]); BOOST_TEST_REQUIRE(annotatedProbability1.s_Probability > 0.75); BOOST_TEST_REQUIRE(annotatedProbability2.s_Probability > 0.9); BOOST_TEST_REQUIRE(annotatedProbability3.s_Probability < 0.05); BOOST_TEST_REQUIRE(p1Baseline[0] > 58.0); BOOST_TEST_REQUIRE(p1Baseline[0] < 62.0); BOOST_TEST_REQUIRE(p2Baseline[0] > 58.0); BOOST_TEST_REQUIRE(p2Baseline[0] < 62.0); BOOST_TEST_REQUIRE(p3Baseline[0] > 58.0); BOOST_TEST_REQUIRE(p3Baseline[0] < 62.0); } BOOST_FIXTURE_TEST_CASE(testInterimCorrectionsWithCorrelations, CTestFixture) { core_t::TTime startTime{3600}; core_t::TTime bucketLength{3600}; SModelParams params(bucketLength); params.s_MultivariateByFields = true; this->makeModel(params, {model_t::E_IndividualCountByBucketAndPerson}, startTime, 3); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); core_t::TTime now{startTime}; core_t::TTime endTime{now + 2 * 24 * bucketLength}; test::CRandomNumbers rng; TDoubleVec samples(1, 0.0); while (now < endTime) { rng.generateUniformSamples(80.0, 100.0, 1, samples); for (std::size_t i = 0; i < static_cast<std::size_t>(samples[0] + 0.5); ++i) { this->addArrival(SMessage(now, "p1", TOptionalDouble()), m_Gatherer); } for (std::size_t i = 0; i < static_cast<std::size_t>(samples[0] + 10.5); ++i) { this->addArrival(SMessage(now, "p2", TOptionalDouble()), m_Gatherer); } for (std::size_t i = 0; i < static_cast<std::size_t>(samples[0] - 9.5); ++i) { this->addArrival(SMessage(now, "p3", TOptionalDouble()), m_Gatherer); } model->sample(now, now + bucketLength, m_ResourceMonitor); now += bucketLength; } for (std::size_t i = 0; i < 9; ++i) { this->addArrival(SMessage(now, "p1", TOptionalDouble()), m_Gatherer); } for (std::size_t i = 0; i < 10; ++i) { this->addArrival(SMessage(now, "p2", TOptionalDouble()), m_Gatherer); } for (std::size_t i = 0; i < 8; ++i) { this->addArrival(SMessage(now, "p3", TOptionalDouble()), m_Gatherer); } model->sampleBucketStatistics(now, now + bucketLength, m_ResourceMonitor); CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); model_t::CResultType type(model_t::CResultType::E_Conditional | model_t::CResultType::E_Interim); SAnnotatedProbability annotatedProbability1; annotatedProbability1.s_ResultType = type; BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, now, now + bucketLength, partitioningFields, 1, annotatedProbability1)); SAnnotatedProbability annotatedProbability2; annotatedProbability2.s_ResultType = type; BOOST_TEST_REQUIRE(model->computeProbability( 1 /*pid*/, now, now + bucketLength, partitioningFields, 1, annotatedProbability2)); SAnnotatedProbability annotatedProbability3; annotatedProbability3.s_ResultType = type; BOOST_TEST_REQUIRE(model->computeProbability( 2 /*pid*/, now, now + bucketLength, partitioningFields, 1, annotatedProbability3)); TDouble1Vec p1Baseline = model->baselineBucketMean( model_t::E_IndividualCountByBucketAndPerson, 0, 0, type, annotatedProbability1.s_AttributeProbabilities[0].s_Correlated, now); TDouble1Vec p2Baseline = model->baselineBucketMean( model_t::E_IndividualCountByBucketAndPerson, 1, 0, type, annotatedProbability2.s_AttributeProbabilities[0].s_Correlated, now); TDouble1Vec p3Baseline = model->baselineBucketMean( model_t::E_IndividualCountByBucketAndPerson, 2, 0, type, annotatedProbability3.s_AttributeProbabilities[0].s_Correlated, now); LOG_DEBUG(<< "p1 probability = " << annotatedProbability1.s_Probability); LOG_DEBUG(<< "p2 probability = " << annotatedProbability2.s_Probability); LOG_DEBUG(<< "p3 probability = " << annotatedProbability3.s_Probability); LOG_DEBUG(<< "p1 baseline = " << p1Baseline[0]); LOG_DEBUG(<< "p2 baseline = " << p2Baseline[0]); LOG_DEBUG(<< "p3 baseline = " << p3Baseline[0]); BOOST_TEST_REQUIRE(annotatedProbability1.s_Probability > 0.7); BOOST_TEST_REQUIRE(annotatedProbability2.s_Probability > 0.7); BOOST_TEST_REQUIRE(annotatedProbability3.s_Probability > 0.7); BOOST_TEST_REQUIRE(p1Baseline[0] > 8.4); BOOST_TEST_REQUIRE(p1Baseline[0] < 8.6); BOOST_TEST_REQUIRE(p2Baseline[0] > 9.4); BOOST_TEST_REQUIRE(p2Baseline[0] < 9.6); BOOST_TEST_REQUIRE(p3Baseline[0] > 7.4); BOOST_TEST_REQUIRE(p3Baseline[0] < 7.6); } BOOST_FIXTURE_TEST_CASE(testSummaryCountZeroRecordsAreIgnored, CTestFixture) { core_t::TTime startTime{100}; core_t::TTime bucketLength{100}; SModelParams params(bucketLength); std::string summaryCountField{"count"}; CModelFactory::TDataGathererPtr gathererWithZeros; CModelFactory::TModelPtr modelWithZerosPtr; this->makeModelT<CEventRateModelFactory>( params, {model_t::E_IndividualCountByBucketAndPerson}, startTime, model_t::E_EventRateOnline, gathererWithZeros, modelWithZerosPtr, {}, summaryCountField); CEventRateModel& modelWithZeros = static_cast<CEventRateModel&>(*modelWithZerosPtr); CModelFactory::TDataGathererPtr gathererNoZeros; CModelFactory::TModelPtr modelNoZerosPtr; this->makeModelT<CEventRateModelFactory>( params, {model_t::E_IndividualCountByBucketAndPerson}, startTime, model_t::E_EventRateOnline, gathererNoZeros, modelNoZerosPtr, {}, summaryCountField); CEventRateModel& modelNoZeros = static_cast<CEventRateModel&>(*modelNoZerosPtr); // The idea here is to compare a model that has records with summary count of zero // against a model that has no records at all where the first model had the zero-count records. core_t::TTime now{100}; core_t::TTime end{now + 50 * bucketLength}; test::CRandomNumbers rng; TSizeVec samples; TDoubleVec zeroCountProbability; std::string summaryCountZero{"0"}; std::string summaryCountOne{"1"}; while (now < end) { rng.generateUniformSamples(1, 10, 1, samples); rng.generateUniformSamples(0.0, 1.0, 1, zeroCountProbability); for (std::size_t i = 0; i < samples[0]; ++i) { if (zeroCountProbability[0] < 0.2) { this->addArrival(SMessage(now, "p1", {}, {}, std::nullopt, std::nullopt, summaryCountZero), gathererWithZeros); } else { this->addArrival(SMessage(now, "p1", {}, {}, std::nullopt, std::nullopt, summaryCountOne), gathererWithZeros); this->addArrival(SMessage(now, "p1", {}, {}, std::nullopt, std::nullopt, summaryCountOne), gathererNoZeros); } } modelWithZeros.sample(now, now + bucketLength, m_ResourceMonitor); modelNoZeros.sample(now, now + bucketLength, m_ResourceMonitor); now += bucketLength; } BOOST_REQUIRE_EQUAL(modelWithZeros.checksum(), modelNoZeros.checksum()); } BOOST_FIXTURE_TEST_CASE(testComputeProbabilityGivenDetectionRule, CTestFixture) { CRuleCondition condition; condition.appliesTo(CRuleCondition::E_Actual); condition.op(CRuleCondition::E_LT); condition.value(100.0); CDetectionRule rule; rule.addCondition(condition); core_t::TTime startTime{3600}; core_t::TTime bucketLength{3600}; core_t::TTime endTime{24 * bucketLength}; SModelParams params(bucketLength); SModelParams::TDetectionRuleVec rules{rule}; params.s_DetectionRules = SModelParams::TDetectionRuleVecCRef(rules); this->makeModel(params, {model_t::E_IndividualCountByBucketAndPerson}, startTime, 1); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); test::CRandomNumbers rng; core_t::TTime now = startTime; TDoubleVec samples(1, 0.0); while (now < endTime) { rng.generateUniformSamples(50.0, 70.0, 1, samples); for (std::size_t i = 0; i < static_cast<std::size_t>(samples[0] + 0.5); ++i) { this->addArrival(SMessage(now, "p1", TOptionalDouble()), m_Gatherer); } model->sample(now, now + bucketLength, m_ResourceMonitor); now += bucketLength; } for (std::size_t i = 0; i < 35; ++i) { this->addArrival(SMessage(now, "p1", TOptionalDouble()), m_Gatherer); } model->sampleBucketStatistics(now, now + bucketLength, m_ResourceMonitor); CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); SAnnotatedProbability annotatedProbability; BOOST_TEST_REQUIRE(model->computeProbability( 0 /*pid*/, now, now + bucketLength, partitioningFields, 1, annotatedProbability)); BOOST_REQUIRE_CLOSE_ABSOLUTE(annotatedProbability.s_Probability, 1.0, 0.00001); } BOOST_FIXTURE_TEST_CASE(testDecayRateControl, CTestFixture) { core_t::TTime startTime{0}; core_t::TTime bucketLength{1800}; model_t::EFeature feature = model_t::E_IndividualCountByBucketAndPerson; model_t::TFeatureVec features{feature}; SModelParams params(bucketLength); params.s_DecayRate = 0.001; params.s_MinimumModeFraction = CAnomalyDetectorModelConfig::DEFAULT_INDIVIDUAL_MINIMUM_MODE_FRACTION; test::CRandomNumbers rng; LOG_DEBUG(<< "*** Test anomaly ***"); { // Test we don't adapt the decay rate if there is a short-lived // anomaly. We should get essentially identical prediction errors // with and without decay control. params.s_ControlDecayRate = true; params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); factory.features(features); CModelFactory::TDataGathererPtr gatherer{factory.makeDataGatherer(startTime)}; CModelFactory::TModelPtr model{factory.makeModel(gatherer)}; this->addPerson("p1", gatherer); params.s_ControlDecayRate = false; params.s_DecayRate = 0.001; CEventRateModelFactory referenceFactory(params, interimBucketCorrector); referenceFactory.features(features); CModelFactory::TDataGathererPtr referenceGatherer{ referenceFactory.makeDataGatherer(startTime)}; CModelFactory::TModelPtr referenceModel{referenceFactory.makeModel(referenceGatherer)}; this->addPerson("p1", referenceGatherer); TMeanAccumulator meanPredictionError; TMeanAccumulator meanReferencePredictionError; model_t::CResultType type(model_t::CResultType::E_Unconditional | model_t::CResultType::E_Interim); for (core_t::TTime t = 0; t < 4 * core::constants::WEEK; t += bucketLength) { if (t % core::constants::WEEK == 0) { LOG_DEBUG(<< "week " << t / core::constants::WEEK + 1); } TDoubleVec rate; rng.generateUniformSamples(0.0, 10.0, 1, rate); rate[0] += 20.0 * (t > 3 * core::constants::WEEK && t < core::constants::WEEK + 4 * 3600 ? 1.0 : 0.0); for (std::size_t i = 0; i < static_cast<std::size_t>(rate[0]); ++i) { this->addArrival( SMessage(t + bucketLength / 2, "p1", TOptionalDouble()), gatherer); this->addArrival(SMessage(t + bucketLength / 2, "p1", TOptionalDouble()), referenceGatherer); } model->sample(t, t + bucketLength, m_ResourceMonitor); referenceModel->sample(t, t + bucketLength, m_ResourceMonitor); meanPredictionError.add(std::fabs( model->currentBucketValue(feature, 0, 0, t + bucketLength / 2)[0] - model->baselineBucketMean(feature, 0, 0, type, NO_CORRELATES, t + bucketLength / 2)[0])); meanReferencePredictionError.add(std::fabs( referenceModel->currentBucketValue(feature, 0, 0, t + bucketLength / 2)[0] - referenceModel->baselineBucketMean(feature, 0, 0, type, NO_CORRELATES, t + bucketLength / 2)[0])); } LOG_DEBUG(<< "mean = " << maths::common::CBasicStatistics::mean(meanPredictionError)); LOG_DEBUG(<< "reference = " << maths::common::CBasicStatistics::mean(meanReferencePredictionError)); BOOST_REQUIRE_CLOSE_ABSOLUTE( maths::common::CBasicStatistics::mean(meanReferencePredictionError), maths::common::CBasicStatistics::mean(meanPredictionError), 0.01); } LOG_DEBUG(<< "*** Test linear scaling ***"); { // This change point is amongst those we explicitly detect so // check we get similar detection performance with and without // decay rate control. params.s_ControlDecayRate = true; params.s_DecayRate = 0.001; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); factory.features(features); CModelFactory::TDataGathererPtr gatherer{factory.makeDataGatherer(startTime)}; CModelFactory::TModelPtr model{factory.makeModel(gatherer)}; this->addPerson("p1", gatherer); params.s_ControlDecayRate = false; params.s_DecayRate = 0.001; CEventRateModelFactory referenceFactory(params, interimBucketCorrector); referenceFactory.features(features); CModelFactory::TDataGathererPtr referenceGatherer{factory.makeDataGatherer(startTime)}; CModelFactory::TModelPtr referenceModel{factory.makeModel(gatherer)}; this->addPerson("p1", referenceGatherer); TMeanAccumulator meanPredictionError; TMeanAccumulator meanReferencePredictionError; model_t::CResultType type(model_t::CResultType::E_Unconditional | model_t::CResultType::E_Interim); for (core_t::TTime t = 0; t < 10 * core::constants::WEEK; t += bucketLength) { if (t % core::constants::WEEK == 0) { LOG_DEBUG(<< "week " << t / core::constants::WEEK + 1); } double rate = 10.0 * (1.0 + std::sin(boost::math::double_constants::two_pi * static_cast<double>(t) / static_cast<double>(core::constants::DAY))) * (t < 5 * core::constants::WEEK ? 1.0 : 2.0); TDoubleVec noise; rng.generateUniformSamples(0.0, 3.0, 1, noise); for (std::size_t i = 0; i < static_cast<std::size_t>(rate + noise[0]); ++i) { this->addArrival( SMessage(t + bucketLength / 2, "p1", TOptionalDouble()), gatherer); this->addArrival(SMessage(t + bucketLength / 2, "p1", TOptionalDouble()), referenceGatherer); } model->sample(t, t + bucketLength, m_ResourceMonitor); referenceModel->sample(t, t + bucketLength, m_ResourceMonitor); meanPredictionError.add(std::fabs( model->currentBucketValue(feature, 0, 0, t + bucketLength / 2)[0] - model->baselineBucketMean(feature, 0, 0, type, NO_CORRELATES, t + bucketLength / 2)[0])); meanReferencePredictionError.add(std::fabs( referenceModel->currentBucketValue(feature, 0, 0, t + bucketLength / 2)[0] - referenceModel->baselineBucketMean(feature, 0, 0, type, NO_CORRELATES, t + bucketLength / 2)[0])); } LOG_DEBUG(<< "mean = " << maths::common::CBasicStatistics::mean(meanPredictionError)); LOG_DEBUG(<< "reference = " << maths::common::CBasicStatistics::mean(meanReferencePredictionError)); BOOST_REQUIRE_CLOSE_ABSOLUTE( maths::common::CBasicStatistics::mean(meanReferencePredictionError), maths::common::CBasicStatistics::mean(meanPredictionError), 0.05); } LOG_DEBUG(<< "*** Test unmodelled cyclic component ***"); { // This modulates the event rate using a sine with period 10 weeks // effectively there are significant "manoeuvres" in the event rate // every 5 weeks at the function turning points. We check we get a // significant reduction in the prediction error with decay rate // control. params.s_ControlDecayRate = true; params.s_DecayRate = 0.0005; auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(params, interimBucketCorrector); factory.features(features); CModelFactory::TDataGathererPtr gatherer{factory.makeDataGatherer(startTime)}; CModelFactory::TModelPtr model{factory.makeModel(gatherer)}; this->addPerson("p1", gatherer); params.s_ControlDecayRate = false; params.s_DecayRate = 0.0005; CEventRateModelFactory referenceFactory(params, interimBucketCorrector); referenceFactory.features(features); CModelFactory::TDataGathererPtr referenceGatherer{ referenceFactory.makeDataGatherer(startTime)}; CModelFactory::TModelPtr referenceModel{referenceFactory.makeModel(gatherer)}; this->addPerson("p1", referenceGatherer); TMeanAccumulator meanPredictionError; TMeanAccumulator meanReferencePredictionError; model_t::CResultType type(model_t::CResultType::E_Unconditional | model_t::CResultType::E_Interim); for (core_t::TTime t = 0; t < 20 * core::constants::WEEK; t += bucketLength) { if (t % core::constants::WEEK == 0) { LOG_DEBUG(<< "week " << t / core::constants::WEEK + 1); } double rate = 10.0 * (1.0 + std::sin(boost::math::double_constants::two_pi * static_cast<double>(t) / static_cast<double>(core::constants::DAY))) * (1.0 + std::sin(boost::math::double_constants::two_pi * static_cast<double>(t) / 10.0 / static_cast<double>(core::constants::WEEK))); TDoubleVec noise; rng.generateUniformSamples(0.0, 3.0, 1, noise); for (std::size_t i = 0; i < static_cast<std::size_t>(rate + noise[0]); ++i) { this->addArrival( SMessage(t + bucketLength / 2, "p1", TOptionalDouble()), gatherer); this->addArrival(SMessage(t + bucketLength / 2, "p1", TOptionalDouble()), referenceGatherer); } model->sample(t, t + bucketLength, m_ResourceMonitor); referenceModel->sample(t, t + bucketLength, m_ResourceMonitor); meanPredictionError.add(std::fabs( model->currentBucketValue(feature, 0, 0, t + bucketLength / 2)[0] - model->baselineBucketMean(feature, 0, 0, type, NO_CORRELATES, t + bucketLength / 2)[0])); meanReferencePredictionError.add(std::fabs( referenceModel->currentBucketValue(feature, 0, 0, t + bucketLength / 2)[0] - referenceModel->baselineBucketMean(feature, 0, 0, type, NO_CORRELATES, t + bucketLength / 2)[0])); } LOG_DEBUG(<< "mean = " << maths::common::CBasicStatistics::mean(meanPredictionError)); LOG_DEBUG(<< "reference = " << maths::common::CBasicStatistics::mean(meanReferencePredictionError)); BOOST_TEST_REQUIRE( maths::common::CBasicStatistics::mean(meanPredictionError) < 0.8 * maths::common::CBasicStatistics::mean(meanReferencePredictionError)); } } BOOST_FIXTURE_TEST_CASE(testIgnoreSamplingGivenDetectionRules, CTestFixture) { // Create 2 models, one of which has a skip sampling rule. // Feed the same data into both models then add extra data // into the first model we know will be filtered out. // At the end the checksums for the underlying models should // be the same. // Create a rule to filter buckets where the count > 100 CRuleCondition condition; condition.appliesTo(CRuleCondition::E_Actual); condition.op(CRuleCondition::E_GT); condition.value(100.0); CDetectionRule rule; rule.action(CDetectionRule::E_SkipModelUpdate); rule.addCondition(condition); std::size_t bucketLength{100}; std::size_t startTime{100}; // Model without the skip sampling rule SModelParams paramsNoRules(bucketLength); auto interimBucketCorrector = std::make_shared<CInterimBucketCorrector>(bucketLength); CEventRateModelFactory factory(paramsNoRules, interimBucketCorrector); model_t::TFeatureVec features{model_t::E_IndividualCountByBucketAndPerson}; factory.features(features); CModelFactory::TDataGathererPtr gathererNoSkip{factory.makeDataGatherer(startTime)}; CModelFactory::TModelPtr modelPtrNoSkip{factory.makeModel(gathererNoSkip)}; auto* modelNoSkip = dynamic_cast<CEventRateModel*>(modelPtrNoSkip.get()); this->addPerson("p1", gathererNoSkip); // Model with the skip sampling rule SModelParams paramsWithRules(bucketLength); SModelParams::TDetectionRuleVec rules{rule}; paramsWithRules.s_DetectionRules = SModelParams::TDetectionRuleVecCRef(rules); CEventRateModelFactory factoryWithSkip(paramsWithRules, interimBucketCorrector); factoryWithSkip.features(features); CModelFactory::TDataGathererPtr gathererWithSkip{ factoryWithSkip.makeDataGatherer(startTime)}; CModelFactory::TModelPtr modelPtrWithSkip{factoryWithSkip.makeModel(gathererWithSkip)}; auto* modelWithSkip = dynamic_cast<CEventRateModel*>(modelPtrWithSkip.get()); this->addPerson("p1", gathererWithSkip); std::size_t endTime = startTime + bucketLength; // Add a bucket to both models for (int i = 0; i < 66; ++i) { this->addArrival(SMessage(startTime, "p1", TOptionalDouble()), gathererNoSkip); this->addArrival(SMessage(startTime, "p1", TOptionalDouble()), gathererWithSkip); } modelNoSkip->sample(startTime, endTime, m_ResourceMonitor); modelWithSkip->sample(startTime, endTime, m_ResourceMonitor); startTime = endTime; endTime += bucketLength; BOOST_REQUIRE_EQUAL(modelWithSkip->checksum(), modelNoSkip->checksum()); // Add a bucket to both models for (int i = 0; i < 55; ++i) { this->addArrival(SMessage(startTime, "p1", TOptionalDouble()), gathererNoSkip); this->addArrival(SMessage(startTime, "p1", TOptionalDouble()), gathererWithSkip); } modelNoSkip->sample(startTime, endTime, m_ResourceMonitor); modelWithSkip->sample(startTime, endTime, m_ResourceMonitor); startTime = endTime; endTime += bucketLength; BOOST_REQUIRE_EQUAL(modelWithSkip->checksum(), modelNoSkip->checksum()); // this sample will be skipped by the detection rule for (int i = 0; i < 110; ++i) { this->addArrival(SMessage(startTime, "p1", TOptionalDouble()), gathererWithSkip); } modelWithSkip->sample(startTime, endTime, m_ResourceMonitor); startTime = endTime; endTime += bucketLength; // Wind the other model forward modelNoSkip->skipSampling(startTime); for (int i = 0; i < 55; ++i) { this->addArrival(SMessage(startTime, "p1", TOptionalDouble()), gathererNoSkip); this->addArrival(SMessage(startTime, "p1", TOptionalDouble()), gathererWithSkip); } modelNoSkip->sample(startTime, endTime, m_ResourceMonitor); modelWithSkip->sample(startTime, endTime, m_ResourceMonitor); // Checksums will be different due to the data gatherers BOOST_TEST_REQUIRE(modelWithSkip->checksum() != modelNoSkip->checksum()); // but the underlying models should be the same CAnomalyDetectorModel::TModelDetailsViewUPtr modelWithSkipView = modelWithSkip->details(); CAnomalyDetectorModel::TModelDetailsViewUPtr modelNoSkipView = modelNoSkip->details(); std::uint64_t withSkipChecksum{ static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelWithSkipView->model(model_t::E_IndividualCountByBucketAndPerson, 0)) ->residualModel() .checksum()}; std::uint64_t noSkipChecksum{ static_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelNoSkipView->model(model_t::E_IndividualCountByBucketAndPerson, 0)) ->residualModel() .checksum()}; // Checksums differ due to different weighting applied to samples for the "skip" model BOOST_TEST_REQUIRE(withSkipChecksum != noSkipChecksum); // Check the last value times of the underlying models are the same const maths::time_series::CUnivariateTimeSeriesModel* timeSeriesModel = dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelNoSkipView->model(model_t::E_IndividualCountByBucketAndPerson, 0)); BOOST_TEST_REQUIRE(timeSeriesModel); const auto* trendModel = dynamic_cast<const maths::time_series::CTimeSeriesDecomposition*>( &timeSeriesModel->trendModel()); BOOST_TEST_REQUIRE(trendModel); core_t::TTime time = trendModel->lastValueTime(); BOOST_REQUIRE_EQUAL(model_t::sampleTime(model_t::E_IndividualCountByBucketAndPerson, startTime, bucketLength), time); timeSeriesModel = dynamic_cast<const maths::time_series::CUnivariateTimeSeriesModel*>( modelWithSkipView->model(model_t::E_IndividualCountByBucketAndPerson, 0)); BOOST_TEST_REQUIRE(timeSeriesModel); trendModel = dynamic_cast<const maths::time_series::CTimeSeriesDecomposition*>( &timeSeriesModel->trendModel()); BOOST_TEST_REQUIRE(trendModel); BOOST_REQUIRE_EQUAL(time, trendModel->lastValueTime()); } BOOST_FIXTURE_TEST_CASE(testRareScoreExplanations, CTestFixture) { // Ensure that first occurrence of the category and actual and typical concentrations os // the anomaly score explanation are correct. const core_t::TTime startTime{1346968800}; const core_t::TTime bucketLength{3600}; SModelParams params(bucketLength); this->makeModel(params, {model_t::E_IndividualTotalBucketCountByPerson, model_t::E_IndividualIndicatorOfBucketPerson}, startTime, 5); auto* model = dynamic_cast<CEventRateModel*>(m_Model.get()); core_t::TTime time{startTime}; for (/**/; time < startTime + 10 * bucketLength; time += bucketLength) { this->addArrival(SMessage(time + bucketLength / 2, "p1", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p2", TOptionalDouble()), m_Gatherer); model->sample(time, time + bucketLength, m_ResourceMonitor); } for (/**/; time < startTime + 50 * bucketLength; time += bucketLength) { this->addArrival(SMessage(time + bucketLength / 2, "p1", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p2", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p3", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p4", TOptionalDouble()), m_Gatherer); model->sample(time, time + bucketLength, m_ResourceMonitor); } this->addArrival(SMessage(time + bucketLength / 2, "p1", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p2", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p3", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p4", TOptionalDouble()), m_Gatherer); this->addArrival(SMessage(time + bucketLength / 2, "p5", TOptionalDouble()), m_Gatherer); model->sample(time, time + bucketLength, m_ResourceMonitor); TDoubleVec actualConcentrations; TDoubleVec typicalConcentrations; for (std::size_t pid = 0; pid < 5; ++pid) { SAnnotatedProbability annotatedProbability; CPartitioningFields partitioningFields(EMPTY_STRING, EMPTY_STRING); BOOST_TEST_REQUIRE(model->computeProbability( pid, time, time + bucketLength, partitioningFields, 0, annotatedProbability)); LOG_DEBUG(<< "anomaly score explanation = " << annotatedProbability.s_AnomalyScoreExplanation); if (pid == 4) { BOOST_REQUIRE_EQUAL( true, annotatedProbability.s_AnomalyScoreExplanation.s_ByFieldFirstOccurrence); } else { BOOST_REQUIRE_EQUAL( false, annotatedProbability.s_AnomalyScoreExplanation.s_ByFieldFirstOccurrence); } actualConcentrations.push_back( annotatedProbability.s_AnomalyScoreExplanation.s_ByFieldActualConcentration); typicalConcentrations.push_back( annotatedProbability.s_AnomalyScoreExplanation.s_ByFieldTypicalConcentration); } double medianConcentration{maths::common::CBasicStatistics::median(actualConcentrations)}; for (const auto& typicalConcentration : typicalConcentrations) { BOOST_REQUIRE_EQUAL(medianConcentration, typicalConcentration); } } BOOST_AUTO_TEST_SUITE_END()