lib/api/unittest/CForecastRunnerTest.cc (341 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <core/CJsonOutputStreamWrapper.h>
#include <core/CLogger.h>
#include <core/Constants.h>
#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CLimits.h>
#include <api/CAnomalyJobConfig.h>
#include "CTestAnomalyJob.h"
#include <boost/test/unit_test.hpp>
#include <cmath>
#include <memory>
#include <string>
BOOST_AUTO_TEST_SUITE(CForecastRunnerTest)
namespace {
using TGenerateRecord = void (*)(ml::core_t::TTime time,
CTestAnomalyJob::TStrStrUMap& dataRows);
const ml::core_t::TTime START_TIME{12000000};
const ml::core_t::TTime BUCKET_LENGTH{3600};
void generateRecord(ml::core_t::TTime time, CTestAnomalyJob::TStrStrUMap& dataRows) {
dataRows["time"] = ml::core::CStringUtils::typeToString(time);
}
void generateRecordWithSummaryCount(ml::core_t::TTime time,
CTestAnomalyJob::TStrStrUMap& dataRows) {
double x = static_cast<double>(time - START_TIME) / BUCKET_LENGTH;
double count = (std::sin(x / 4.0) + 1.0) * 42.0 * std::pow(1.005, x);
dataRows["time"] = ml::core::CStringUtils::typeToString(time);
dataRows["count"] = ml::core::CStringUtils::typeToString(count);
}
void generateRecordWithStatus(ml::core_t::TTime time, CTestAnomalyJob::TStrStrUMap& dataRows) {
dataRows["time"] = ml::core::CStringUtils::typeToString(time);
dataRows["status"] = (time / BUCKET_LENGTH) % 919 == 0 ? "404" : "200";
}
void generatePopulationRecord(ml::core_t::TTime time, CTestAnomalyJob::TStrStrUMap& dataRows) {
dataRows["time"] = ml::core::CStringUtils::typeToString(time);
dataRows["person"] = "jill";
}
void populateJob(TGenerateRecord generateRecord, CTestAnomalyJob& job, std::size_t buckets = 1000) {
ml::core_t::TTime time = START_TIME;
CTestAnomalyJob::TStrStrUMap dataRows;
for (std::size_t bucket = 0; bucket < 2 * buckets;
++bucket, time += (BUCKET_LENGTH / 2)) {
generateRecord(time, dataRows);
BOOST_TEST_REQUIRE(job.handleRecord(dataRows));
}
BOOST_REQUIRE_EQUAL(std::uint64_t(2 * buckets), job.numRecordsHandled());
}
}
BOOST_AUTO_TEST_CASE(testSummaryCount) {
std::stringstream outputStrm;
{
ml::core::CJsonOutputStreamWrapper streamWrapper(outputStrm);
ml::model::CLimits limits;
ml::api::CAnomalyJobConfig jobConfig =
CTestAnomalyJob::makeSimpleJobConfig("count", "", "", "", "", {}, "count");
ml::model::CAnomalyDetectorModelConfig modelConfig =
ml::model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_LENGTH);
CTestAnomalyJob job("job", limits, jobConfig, modelConfig, streamWrapper);
populateJob(generateRecordWithSummaryCount, job);
CTestAnomalyJob::TStrStrUMap dataRows;
dataRows["."] = "p{\"duration\":" + std::to_string(13 * BUCKET_LENGTH) +
",\"forecast_id\": \"42\"" + ",\"forecast_alias\": \"sumcount\"" +
",\"create_time\": \"1511370819\"" + ",\"expires_in\": \"" +
std::to_string(100 * ml::core::constants::DAY) + "\" }";
BOOST_TEST_REQUIRE(job.handleRecord(dataRows));
}
json::error_code ec;
json::value doc = json::parse(outputStrm.str(), ec);
BOOST_TEST_REQUIRE(ec.failed() == false);
BOOST_TEST_REQUIRE(doc.as_array().size() > 0);
bool foundScheduledRecord = false;
bool foundStartedRecord = false;
for (const auto& m_ : doc.as_array()) {
const json::object& m = m_.as_object();
if (m.contains("model_forecast_request_stats")) {
const json::value& forecastStart = m.at("model_forecast_request_stats");
if (std::strcmp("scheduled",
forecastStart.at("forecast_status").as_string().c_str()) == 0) {
BOOST_TEST_REQUIRE(!foundStartedRecord);
foundScheduledRecord = true;
} else if (std::strcmp(
"started",
forecastStart.at("forecast_status").as_string().c_str()) == 0) {
BOOST_TEST_REQUIRE(foundScheduledRecord);
foundStartedRecord = true;
break;
}
}
}
BOOST_TEST_REQUIRE(foundScheduledRecord);
BOOST_TEST_REQUIRE(foundStartedRecord);
const json::value& lastElement = doc.as_array()[doc.as_array().size() - 1];
BOOST_TEST_REQUIRE(lastElement.as_object().contains("model_forecast_request_stats"));
const json::value& forecastStats_ = lastElement.at_pointer("/model_forecast_request_stats");
const json::object& forecastStats = forecastStats_.as_object();
BOOST_REQUIRE_EQUAL(std::string("42"),
std::string(forecastStats.at("forecast_id").as_string()));
BOOST_REQUIRE_EQUAL(std::string("sumcount"),
std::string(forecastStats.at("forecast_alias").as_string()));
BOOST_REQUIRE_EQUAL(
1511370819 * int64_t(1000),
forecastStats.at("forecast_create_timestamp").to_number<std::int64_t>());
BOOST_TEST_REQUIRE(forecastStats.contains("processed_record_count"));
BOOST_REQUIRE_EQUAL(
13, forecastStats.at("processed_record_count").to_number<std::int64_t>());
BOOST_REQUIRE_EQUAL(1.0, forecastStats.at("forecast_progress").to_number<double>());
BOOST_REQUIRE_EQUAL(std::string("finished"),
std::string(forecastStats.at("forecast_status").as_string()));
BOOST_REQUIRE_EQUAL(15591600 * int64_t(1000),
forecastStats.at("timestamp").to_number<std::int64_t>());
BOOST_REQUIRE_EQUAL(
15591600 * int64_t(1000),
forecastStats.at("forecast_start_timestamp").to_number<std::int64_t>());
BOOST_REQUIRE_EQUAL(
(15591600 + 13 * BUCKET_LENGTH) * int64_t(1000),
forecastStats.at("forecast_end_timestamp").to_number<std::int64_t>());
BOOST_REQUIRE_EQUAL(
(1511370819 + 100 * ml::core::constants::DAY) * int64_t(1000),
forecastStats.at("forecast_expiry_timestamp").to_number<std::int64_t>());
}
BOOST_AUTO_TEST_CASE(testPopulation) {
std::stringstream outputStrm;
{
ml::core::CJsonOutputStreamWrapper streamWrapper(outputStrm);
ml::model::CLimits limits;
ml::api::CAnomalyJobConfig jobConfig =
CTestAnomalyJob::makeSimpleJobConfig("count", "", "", "person", "");
ml::model::CAnomalyDetectorModelConfig modelConfig =
ml::model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_LENGTH);
CTestAnomalyJob job("job", limits, jobConfig, modelConfig, streamWrapper);
populateJob(generatePopulationRecord, job);
CTestAnomalyJob::TStrStrUMap dataRows;
dataRows["."] = "p{\"duration\":" + std::to_string(13 * BUCKET_LENGTH) +
",\"forecast_id\": \"31\"" + ",\"create_time\": \"1511370819\" }";
BOOST_TEST_REQUIRE(job.handleRecord(dataRows));
}
json::error_code ec;
json::value doc = json::parse(outputStrm.str(), ec);
BOOST_TEST_REQUIRE(ec.failed() == false);
BOOST_TEST_REQUIRE(doc.is_array());
const json::value& lastElement_ = doc.as_array()[doc.as_array().size() - 1];
const json::object& lastElement = lastElement_.as_object();
BOOST_TEST_REQUIRE(lastElement.contains("model_forecast_request_stats"));
const json::value& forecastStats_ = lastElement.at("model_forecast_request_stats");
const json::object& forecastStats = forecastStats_.as_object();
BOOST_REQUIRE_EQUAL(std::string("31"),
std::string(forecastStats.at("forecast_id").as_string()));
BOOST_TEST_REQUIRE(!forecastStats.contains("forecast_alias"));
BOOST_REQUIRE_EQUAL(std::string("failed"),
std::string(forecastStats.at("forecast_status").as_string()));
BOOST_REQUIRE_EQUAL(
ml::api::CForecastRunner::ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS,
std::string(forecastStats.at("forecast_messages").as_array()[0].as_string()));
BOOST_REQUIRE_EQUAL(
(1511370819 + 14 * ml::core::constants::DAY) * int64_t(1000),
forecastStats.at("forecast_expiry_timestamp").to_number<std::int64_t>());
}
BOOST_AUTO_TEST_CASE(testRare) {
std::stringstream outputStrm;
{
ml::core::CJsonOutputStreamWrapper streamWrapper(outputStrm);
ml::model::CLimits limits;
ml::api::CAnomalyJobConfig jobConfig =
CTestAnomalyJob::makeSimpleJobConfig("rare", "", "status", "", "");
ml::model::CAnomalyDetectorModelConfig modelConfig =
ml::model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_LENGTH);
CTestAnomalyJob job("job", limits, jobConfig, modelConfig, streamWrapper);
populateJob(generateRecordWithStatus, job, 5000);
CTestAnomalyJob::TStrStrUMap dataRows;
dataRows["."] = "p{\"duration\":" + std::to_string(13 * BUCKET_LENGTH) +
",\"forecast_id\": \"42\"" + ",\"create_time\": \"1511370819\"" +
",\"expires_in\": \"8640000\" }";
BOOST_TEST_REQUIRE(job.handleRecord(dataRows));
}
json::error_code ec;
json::value doc = json::parse(outputStrm.str(), ec);
BOOST_TEST_REQUIRE(ec.failed() == false);
BOOST_TEST_REQUIRE(doc.is_array());
const json::value& lastElement_ = doc.as_array()[doc.as_array().size() - 1];
const json::object& lastElement = lastElement_.as_object();
BOOST_TEST_REQUIRE(lastElement.contains("model_forecast_request_stats"));
const json::value& forecastStats_ = lastElement.at("model_forecast_request_stats");
const json::object& forecastStats = forecastStats_.as_object();
BOOST_REQUIRE_EQUAL(std::string("42"),
std::string(forecastStats.at("forecast_id").as_string()));
BOOST_TEST_REQUIRE(!forecastStats.contains("forecast_alias"));
BOOST_REQUIRE_EQUAL(std::string("failed"),
std::string(forecastStats.at("forecast_status").as_string()));
BOOST_REQUIRE_EQUAL(
ml::api::CForecastRunner::ERROR_NO_SUPPORTED_FUNCTIONS,
std::string(forecastStats.at("forecast_messages").as_array()[0].as_string()));
BOOST_REQUIRE_EQUAL(
(1511370819 + 14 * ml::core::constants::DAY) * int64_t(1000),
forecastStats.at("forecast_expiry_timestamp").to_number<std::int64_t>());
}
BOOST_AUTO_TEST_CASE(testInsufficientData) {
std::stringstream outputStrm;
{
ml::core::CJsonOutputStreamWrapper streamWrapper(outputStrm);
ml::model::CLimits limits;
ml::api::CAnomalyJobConfig jobConfig =
CTestAnomalyJob::makeSimpleJobConfig("count", "", "", "", "");
ml::model::CAnomalyDetectorModelConfig modelConfig =
ml::model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_LENGTH);
CTestAnomalyJob job("job", limits, jobConfig, modelConfig, streamWrapper);
populateJob(generateRecord, job, 3);
CTestAnomalyJob::TStrStrUMap dataRows;
dataRows["."] = "p{\"duration\":" + std::to_string(13 * BUCKET_LENGTH) +
",\"forecast_id\": \"31\"" + ",\"create_time\": \"1511370819\" }";
BOOST_TEST_REQUIRE(job.handleRecord(dataRows));
}
json::error_code ec;
json::value doc = json::parse(outputStrm.str(), ec);
BOOST_TEST_REQUIRE(ec.failed() == false);
BOOST_TEST_REQUIRE(doc.is_array());
const json::value& lastElement_ = doc.as_array()[doc.as_array().size() - 1];
const json::object& lastElement = lastElement_.as_object();
BOOST_TEST_REQUIRE(lastElement.contains("model_forecast_request_stats"));
const json::value& forecastStats_ = lastElement.at("model_forecast_request_stats");
const json::object& forecastStats = forecastStats_.as_object();
LOG_DEBUG(<< "forecastStats: " << forecastStats);
BOOST_REQUIRE_EQUAL(std::string("31"),
std::string(forecastStats.at("forecast_id").as_string()));
BOOST_REQUIRE_EQUAL(std::string("finished"),
std::string(forecastStats.at("forecast_status").as_string()));
BOOST_REQUIRE_EQUAL(1.0, forecastStats.at("forecast_progress").to_number<double>());
BOOST_REQUIRE_EQUAL(
ml::api::CForecastRunner::INFO_NO_MODELS_CAN_CURRENTLY_BE_FORECAST,
std::string(forecastStats.at("forecast_messages").as_array()[0].as_string()));
BOOST_REQUIRE_EQUAL(
(1511370819 + 14 * ml::core::constants::DAY) * int64_t(1000),
forecastStats.at("forecast_expiry_timestamp").to_number<std::int64_t>());
}
BOOST_AUTO_TEST_CASE(testValidateDefaultExpiry) {
ml::api::CForecastRunner::SForecast forecastJob;
std::string message("p{\"duration\":" + std::to_string(2 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\"" + ",\"create_time\": \"1511370819\" }");
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(2 * ml::core::constants::WEEK, forecastJob.s_Duration);
BOOST_REQUIRE_EQUAL(14 * ml::core::constants::DAY + 1511370819, forecastJob.s_ExpiryTime);
std::string message2("p{\"duration\":" + std::to_string(2 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\"" +
",\"create_time\": \"1511370819\"" + ",\"expires_in\": -1 }");
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message2, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(2 * ml::core::constants::WEEK, forecastJob.s_Duration);
BOOST_REQUIRE_EQUAL(14 * ml::core::constants::DAY + 1511370819, forecastJob.s_ExpiryTime);
}
BOOST_AUTO_TEST_CASE(testValidateNoExpiry) {
ml::api::CForecastRunner::SForecast forecastJob;
std::string message("p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\"" +
",\"create_time\": \"1511370819\"" + ",\"expires_in\": 0 }");
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(3 * ml::core::constants::WEEK, forecastJob.s_Duration);
BOOST_REQUIRE_EQUAL(ml::core_t::TTime(1511370819), forecastJob.s_ExpiryTime);
BOOST_REQUIRE_EQUAL(forecastJob.s_CreateTime, forecastJob.s_ExpiryTime);
}
BOOST_AUTO_TEST_CASE(testValidateInvalidExpiry) {
ml::api::CForecastRunner::SForecast forecastJob;
std::string message("p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\"" + ",\"create_time\": \"1511370819\"" +
",\"expires_in\": -244 }");
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(3 * ml::core::constants::WEEK, forecastJob.s_Duration);
BOOST_REQUIRE_EQUAL(14 * ml::core::constants::DAY + 1511370819, forecastJob.s_ExpiryTime);
}
BOOST_AUTO_TEST_CASE(testValidateBrokenMessage) {
ml::api::CForecastRunner::SForecast forecastJob;
std::string message("p{\"dura");
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message, forecastJob, 1400000000) == false);
}
BOOST_AUTO_TEST_CASE(testValidateMissingId) {
ml::api::CForecastRunner::SForecast forecastJob;
std::string message("p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"create_time\": \"1511370819\"}");
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message, forecastJob, 1400000000) == false);
}
BOOST_AUTO_TEST_CASE(testValidateProvidedMinDiskSpace) {
ml::api::CForecastRunner::SForecast forecastJob;
std::string message{
"p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\",\"min_available_disk_space\": 100000}"};
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(100000, forecastJob.s_MinForecastAvailableDiskSpace);
std::string message2{"p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\"}"};
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message2, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(ml::api::CForecastRunner::DEFAULT_MIN_FORECAST_AVAILABLE_DISK_SPACE,
forecastJob.s_MinForecastAvailableDiskSpace);
}
BOOST_AUTO_TEST_CASE(testValidateProvidedMaxMemoryLimit) {
ml::api::CForecastRunner::SForecast forecastJob;
std::string message{
"p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\",\"max_model_memory\": 10000000}"};
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(10000000, forecastJob.s_MaxForecastModelMemory);
std::string message2{"p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\"}"};
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message2, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(ml::api::CForecastRunner::DEFAULT_MAX_FORECAST_MODEL_MEMORY,
forecastJob.s_MaxForecastModelMemory);
}
BOOST_AUTO_TEST_CASE(testValidateProvidedTooLargeMaxMemoryLimit) {
ml::api::CForecastRunner::SForecast forecastJob;
std::string message("p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\",\"max_model_memory\":" +
std::to_string(524288000ull + 10ull) + "}");
// larger than the most we can persist to disk should cause a failure
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message, forecastJob, 1400000000,
std::numeric_limits<std::size_t>::max() / 2,
[](const ml::api::CForecastRunner::SForecast&,
const std::string&) { return; }) == false);
std::string message2(
"p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\",\"max_model_memory\":31457280}");
// Larger than 40% of the configured job memory should fail
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message2, forecastJob, 1400000000, 31457280ull,
[](const ml::api::CForecastRunner::SForecast&,
const std::string&) { return; }) == false);
// Less than 40% of the configured job memory should NOT fail
BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message2, forecastJob, 1400000000, static_cast<std::size_t>(31457280ull * 3),
[](const ml::api::CForecastRunner::SForecast&, const std::string&) {
return;
}));
}
BOOST_AUTO_TEST_CASE(testSufficientDiskSpace) {
// These tests could theoretically fail based on environmental factors, but
// it's unlikely - they are saying the current directory must have at least
// 1 byte free disk space and less than 16 exabytes free
BOOST_REQUIRE_EQUAL(
true, ml::api::CForecastRunner::sufficientAvailableDiskSpace(1, "."));
BOOST_REQUIRE_EQUAL(false, ml::api::CForecastRunner::sufficientAvailableDiskSpace(
std::numeric_limits<std::size_t>::max(), "."));
}
BOOST_AUTO_TEST_SUITE_END()