lib/api/unittest/CSingleStreamDataAdderTest.cc (159 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <core/CJsonOutputStreamWrapper.h>
#include <core/COsFileFuncs.h>
#include <core/CStringUtils.h>
#include <core/CoreTypes.h>
#include <maths/common/CModelWeight.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CLimits.h>
#include <api/CAnomalyJobConfig.h>
#include <api/CCsvInputParser.h>
#include <api/CNdJsonInputParser.h>
#include <api/CSingleStreamDataAdder.h>
#include <api/CSingleStreamSearcher.h>
#include <api/CStateRestoreStreamFilter.h>
#include "CTestAnomalyJob.h"
#include "CTestFieldDataCategorizer.h"
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/test/unit_test.hpp>
#include <fstream>
#include <memory>
#include <sstream>
#include <string>
BOOST_AUTO_TEST_SUITE(CSingleStreamDataAdderTest)
namespace {
void reportPersistComplete(ml::api::CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport,
std::string& snapshotIdOut,
size_t& numDocsOut) {
LOG_INFO(<< "Persist complete with description: " << modelSnapshotReport.s_Description);
snapshotIdOut = modelSnapshotReport.s_SnapshotId;
numDocsOut = modelSnapshotReport.s_NumDocs;
}
void detectorPersistHelper(const std::string& configFileName,
const std::string& inputFilename,
int latencyBuckets,
const std::string& timeFormat = std::string()) {
// Start by creating a detector with non-trivial state
static const ml::core_t::TTime BUCKET_SIZE(3600);
static const std::string JOB_ID("job");
// Open the input and output files
std::ifstream inputStrm(inputFilename.c_str());
BOOST_TEST_REQUIRE(inputStrm.is_open());
std::ofstream outputStrm(ml::core::COsFileFuncs::NULL_FILENAME);
BOOST_TEST_REQUIRE(outputStrm.is_open());
ml::model::CLimits limits;
ml::api::CAnomalyJobConfig jobConfig;
BOOST_TEST_REQUIRE(jobConfig.initFromFile(configFileName));
ml::model::CAnomalyDetectorModelConfig modelConfig =
ml::model::CAnomalyDetectorModelConfig::defaultConfig(
BUCKET_SIZE, ml::model_t::E_None, "", BUCKET_SIZE * latencyBuckets, false);
ml::core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);
std::string origSnapshotId;
std::size_t numOrigDocs(0);
std::string origPersistedState;
{
CTestAnomalyJob origJob(
JOB_ID, limits, jobConfig, modelConfig, wrappedOutputStream,
std::bind(&reportPersistComplete, std::placeholders::_1,
std::ref(origSnapshotId), std::ref(numOrigDocs)),
nullptr, -1, "time", timeFormat);
// The categorizer knows how to assign categories to records
CTestFieldDataCategorizer categorizer(JOB_ID, jobConfig.analysisConfig(),
limits, &origJob, wrappedOutputStream);
ml::api::CDataProcessor* firstProcessor{nullptr};
if (jobConfig.analysisConfig().categorizationFieldName().empty() == false) {
LOG_DEBUG(<< "Applying the categorization categorizer for anomaly detection");
firstProcessor = &categorizer;
} else {
firstProcessor = &origJob;
}
using TInputParserUPtr = std::unique_ptr<ml::api::CInputParser>;
const TInputParserUPtr parser{[&inputFilename, &inputStrm]() -> TInputParserUPtr {
ml::api::CInputParser::TStrVec mutableFields{CTestFieldDataCategorizer::MLCATEGORY_NAME};
if (inputFilename.rfind(".csv") == inputFilename.length() - 4) {
return std::make_unique<ml::api::CCsvInputParser>(
std::move(mutableFields), inputStrm);
}
return std::make_unique<ml::api::CNdJsonInputParser>(
std::move(mutableFields), inputStrm);
}()};
BOOST_TEST_REQUIRE(parser->readStreamIntoMaps(
[firstProcessor](const ml::api::CDataProcessor::TStrStrUMap& dataRowFields) {
return firstProcessor->handleRecord(
dataRowFields, ml::api::CDataProcessor::TOptionalTime{});
}));
// Persist the detector state to a stringstream
std::ostringstream* strm(nullptr);
ml::api::CSingleStreamDataAdder::TOStreamP ptr(strm = new std::ostringstream());
ml::api::CSingleStreamDataAdder persister(ptr);
BOOST_TEST_REQUIRE(firstProcessor->persistStateInForeground(persister, ""));
origPersistedState = strm->str();
}
// Now restore the state into a different detector
std::string restoredSnapshotId;
std::size_t numRestoredDocs(0);
std::string newPersistedState;
{
CTestAnomalyJob restoredJob(
JOB_ID, limits, jobConfig, modelConfig, wrappedOutputStream,
std::bind(&reportPersistComplete, std::placeholders::_1,
std::ref(restoredSnapshotId), std::ref(numRestoredDocs)));
// The categorizer knows how to assign categories to records
CTestFieldDataCategorizer restoredCategorizer(
JOB_ID, jobConfig.analysisConfig(), limits, &restoredJob, wrappedOutputStream);
size_t numCategorizerDocs(0);
ml::api::CDataProcessor* restoredFirstProcessor{nullptr};
if (jobConfig.analysisConfig().categorizationFieldName().empty() == false) {
LOG_DEBUG(<< "Applying the categorization categorizer for anomaly detection");
numCategorizerDocs = 1;
restoredFirstProcessor = &restoredCategorizer;
} else {
restoredFirstProcessor = &restoredJob;
}
{
ml::core_t::TTime completeToTime(0);
auto strm = std::make_shared<boost::iostreams::filtering_istream>();
strm->push(ml::api::CStateRestoreStreamFilter());
std::istringstream inputStream(origPersistedState);
strm->push(inputStream);
ml::api::CSingleStreamSearcher retriever(strm);
BOOST_TEST_REQUIRE(restoredFirstProcessor->restoreState(retriever, completeToTime));
BOOST_TEST_REQUIRE(completeToTime > 0);
BOOST_REQUIRE_EQUAL(
numOrigDocs + numCategorizerDocs,
strm->component<ml::api::CStateRestoreStreamFilter>(0)->getDocCount());
}
// Finally, persist the new detector state and compare the result
std::ostringstream* strm(nullptr);
ml::api::CSingleStreamDataAdder::TOStreamP ptr(strm = new std::ostringstream());
ml::api::CSingleStreamDataAdder persister(ptr);
BOOST_TEST_REQUIRE(restoredFirstProcessor->persistStateInForeground(persister, ""));
newPersistedState = strm->str();
}
BOOST_REQUIRE_EQUAL(numOrigDocs, numRestoredDocs);
// The snapshot ID can be different between the two persists, so replace the
// first occurrence of it (which is in the bulk metadata)
BOOST_REQUIRE_EQUAL(1, ml::core::CStringUtils::replaceFirst(
origSnapshotId, "snap", origPersistedState));
BOOST_REQUIRE_EQUAL(1, ml::core::CStringUtils::replaceFirst(
restoredSnapshotId, "snap", newPersistedState));
// Replace the zero byte separators to avoid '\0's in the output if the
// test fails
std::replace(origPersistedState.begin(), origPersistedState.end(), '\0', ',');
std::replace(newPersistedState.begin(), newPersistedState.end(), '\0', ',');
BOOST_REQUIRE_EQUAL(origPersistedState, newPersistedState);
}
}
BOOST_AUTO_TEST_CASE(testDetectorPersistBy) {
detectorPersistHelper("testfiles/new_mlfields.json",
"testfiles/big_ascending.txt", 0, "%d/%b/%Y:%T %z");
}
BOOST_AUTO_TEST_CASE(testDetectorPersistOver) {
detectorPersistHelper("testfiles/new_mlfields_over.json",
"testfiles/big_ascending.txt", 0, "%d/%b/%Y:%T %z");
}
BOOST_AUTO_TEST_CASE(testDetectorPersistPartition) {
detectorPersistHelper("testfiles/new_mlfields_partition.json",
"testfiles/big_ascending.txt", 0, "%d/%b/%Y:%T %z");
}
BOOST_AUTO_TEST_CASE(testDetectorPersistDc) {
detectorPersistHelper("testfiles/new_persist_dc.json",
"testfiles/files_users_programs.csv", 5);
}
BOOST_AUTO_TEST_CASE(testDetectorPersistCount) {
detectorPersistHelper("testfiles/new_persist_count.json",
"testfiles/files_users_programs.csv", 5);
}
BOOST_AUTO_TEST_CASE(testDetectorPersistCategorization) {
detectorPersistHelper("testfiles/new_persist_categorization.json",
"testfiles/time_messages.csv", 0);
}
BOOST_AUTO_TEST_SUITE_END()