in lib/api/unittest/CSingleStreamDataAdderTest.cc [52:192]
void detectorPersistHelper(const std::string& configFileName,
const std::string& inputFilename,
int latencyBuckets,
const std::string& timeFormat = std::string()) {
// Start by creating a detector with non-trivial state
static const ml::core_t::TTime BUCKET_SIZE(3600);
static const std::string JOB_ID("job");
// Open the input and output files
std::ifstream inputStrm(inputFilename.c_str());
BOOST_TEST_REQUIRE(inputStrm.is_open());
std::ofstream outputStrm(ml::core::COsFileFuncs::NULL_FILENAME);
BOOST_TEST_REQUIRE(outputStrm.is_open());
ml::model::CLimits limits;
ml::api::CAnomalyJobConfig jobConfig;
BOOST_TEST_REQUIRE(jobConfig.initFromFile(configFileName));
ml::model::CAnomalyDetectorModelConfig modelConfig =
ml::model::CAnomalyDetectorModelConfig::defaultConfig(
BUCKET_SIZE, ml::model_t::E_None, "", BUCKET_SIZE * latencyBuckets, false);
ml::core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);
std::string origSnapshotId;
std::size_t numOrigDocs(0);
std::string origPersistedState;
{
CTestAnomalyJob origJob(
JOB_ID, limits, jobConfig, modelConfig, wrappedOutputStream,
std::bind(&reportPersistComplete, std::placeholders::_1,
std::ref(origSnapshotId), std::ref(numOrigDocs)),
nullptr, -1, "time", timeFormat);
// The categorizer knows how to assign categories to records
CTestFieldDataCategorizer categorizer(JOB_ID, jobConfig.analysisConfig(),
limits, &origJob, wrappedOutputStream);
ml::api::CDataProcessor* firstProcessor{nullptr};
if (jobConfig.analysisConfig().categorizationFieldName().empty() == false) {
LOG_DEBUG(<< "Applying the categorization categorizer for anomaly detection");
firstProcessor = &categorizer;
} else {
firstProcessor = &origJob;
}
using TInputParserUPtr = std::unique_ptr<ml::api::CInputParser>;
const TInputParserUPtr parser{[&inputFilename, &inputStrm]() -> TInputParserUPtr {
ml::api::CInputParser::TStrVec mutableFields{CTestFieldDataCategorizer::MLCATEGORY_NAME};
if (inputFilename.rfind(".csv") == inputFilename.length() - 4) {
return std::make_unique<ml::api::CCsvInputParser>(
std::move(mutableFields), inputStrm);
}
return std::make_unique<ml::api::CNdJsonInputParser>(
std::move(mutableFields), inputStrm);
}()};
BOOST_TEST_REQUIRE(parser->readStreamIntoMaps(
[firstProcessor](const ml::api::CDataProcessor::TStrStrUMap& dataRowFields) {
return firstProcessor->handleRecord(
dataRowFields, ml::api::CDataProcessor::TOptionalTime{});
}));
// Persist the detector state to a stringstream
std::ostringstream* strm(nullptr);
ml::api::CSingleStreamDataAdder::TOStreamP ptr(strm = new std::ostringstream());
ml::api::CSingleStreamDataAdder persister(ptr);
BOOST_TEST_REQUIRE(firstProcessor->persistStateInForeground(persister, ""));
origPersistedState = strm->str();
}
// Now restore the state into a different detector
std::string restoredSnapshotId;
std::size_t numRestoredDocs(0);
std::string newPersistedState;
{
CTestAnomalyJob restoredJob(
JOB_ID, limits, jobConfig, modelConfig, wrappedOutputStream,
std::bind(&reportPersistComplete, std::placeholders::_1,
std::ref(restoredSnapshotId), std::ref(numRestoredDocs)));
// The categorizer knows how to assign categories to records
CTestFieldDataCategorizer restoredCategorizer(
JOB_ID, jobConfig.analysisConfig(), limits, &restoredJob, wrappedOutputStream);
size_t numCategorizerDocs(0);
ml::api::CDataProcessor* restoredFirstProcessor{nullptr};
if (jobConfig.analysisConfig().categorizationFieldName().empty() == false) {
LOG_DEBUG(<< "Applying the categorization categorizer for anomaly detection");
numCategorizerDocs = 1;
restoredFirstProcessor = &restoredCategorizer;
} else {
restoredFirstProcessor = &restoredJob;
}
{
ml::core_t::TTime completeToTime(0);
auto strm = std::make_shared<boost::iostreams::filtering_istream>();
strm->push(ml::api::CStateRestoreStreamFilter());
std::istringstream inputStream(origPersistedState);
strm->push(inputStream);
ml::api::CSingleStreamSearcher retriever(strm);
BOOST_TEST_REQUIRE(restoredFirstProcessor->restoreState(retriever, completeToTime));
BOOST_TEST_REQUIRE(completeToTime > 0);
BOOST_REQUIRE_EQUAL(
numOrigDocs + numCategorizerDocs,
strm->component<ml::api::CStateRestoreStreamFilter>(0)->getDocCount());
}
// Finally, persist the new detector state and compare the result
std::ostringstream* strm(nullptr);
ml::api::CSingleStreamDataAdder::TOStreamP ptr(strm = new std::ostringstream());
ml::api::CSingleStreamDataAdder persister(ptr);
BOOST_TEST_REQUIRE(restoredFirstProcessor->persistStateInForeground(persister, ""));
newPersistedState = strm->str();
}
BOOST_REQUIRE_EQUAL(numOrigDocs, numRestoredDocs);
// The snapshot ID can be different between the two persists, so replace the
// first occurrence of it (which is in the bulk metadata)
BOOST_REQUIRE_EQUAL(1, ml::core::CStringUtils::replaceFirst(
origSnapshotId, "snap", origPersistedState));
BOOST_REQUIRE_EQUAL(1, ml::core::CStringUtils::replaceFirst(
restoredSnapshotId, "snap", newPersistedState));
// Replace the zero byte separators to avoid '\0's in the output if the
// test fails
std::replace(origPersistedState.begin(), origPersistedState.end(), '\0', ',');
std::replace(newPersistedState.begin(), newPersistedState.end(), '\0', ',');
BOOST_REQUIRE_EQUAL(origPersistedState, newPersistedState);
}