lib/api/unittest/CDataFrameAnalysisSpecificationTest.cc (396 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <core/CContainerPrinter.h>
#include <core/CDataFrame.h>
#include <core/CLogger.h>
#include <api/CDataFrameAnalysisRunner.h>
#include <api/CDataFrameAnalysisSpecification.h>
#include <api/CDataFrameAnalysisSpecificationJsonWriter.h>
#include <api/CDataFrameOutliersRunner.h>
#include <api/CDataFrameTrainBoostedTreeClassifierRunner.h>
#include <api/CDataFrameTrainBoostedTreeRegressionRunner.h>
#include <test/CDataFrameAnalysisSpecificationFactory.h>
#include <test/CTestTmpDir.h>
#include "CDataFrameMockAnalysisRunner.h"
#include <boost/test/unit_test.hpp>
#include <chrono>
#include <memory>
#include <string>
#include <thread>
#include <vector>
BOOST_AUTO_TEST_SUITE(CDataFrameAnalysisSpecificationTest)
using namespace ml;
namespace {
using TStrVec = std::vector<std::string>;
using TRunnerFactoryUPtr = std::unique_ptr<api::CDataFrameAnalysisRunnerFactory>;
using TRunnerFactoryUPtrVec = std::vector<TRunnerFactoryUPtr>;
std::string createSpecJsonForTempDirDiskUsageTest(bool tempDirPathSet, bool diskUsageAllowed) {
std::string tempDir = tempDirPathSet ? test::CTestTmpDir::tmpDir() : "";
return api::CDataFrameAnalysisSpecificationJsonWriter::jsonString(
"testJob", 100, 3, 500000, 1, "", {}, diskUsageAllowed, tempDir, "",
"outlier_detection", "");
}
}
BOOST_AUTO_TEST_CASE(testCreate) {
// This test focuses on checking the validation code we apply to the object
// rather than the JSON parsing so we don't bother with random fuzzing of the
// input string and simply check validation for each field.
TStrVec errors;
auto errorHandler = [&errors](std::string error) { errors.push_back(error); };
core::CLogger::CScopeSetFatalErrorHandler scope{errorHandler};
auto runnerFactories = []() {
TRunnerFactoryUPtr outliers{std::make_unique<api::CDataFrameOutliersRunnerFactory>()};
TRunnerFactoryUPtr regression{
std::make_unique<api::CDataFrameTrainBoostedTreeRegressionRunnerFactory>()};
TRunnerFactoryUPtr classification{
std::make_unique<api::CDataFrameTrainBoostedTreeClassifierRunnerFactory>()};
TRunnerFactoryUPtrVec factories;
factories.push_back(std::move(outliers));
factories.push_back(std::move(regression));
factories.push_back(std::move(classification));
return factories;
};
auto jsonSpec = [](const std::string& jobId, const std::string& rows,
const std::string& cols, const std::string& memory,
const std::string& threads, const std::string& resultsField,
const TStrVec& categoricalFields, const std::string& name,
const std::string& parameters = "", const std::string& junk = "") {
std::ostringstream result;
result << "{\n";
if (jobId.empty() == false) {
result << " \"job_id\": \"" << jobId << "\",\n";
}
if (rows.empty() == false) {
result << " \"rows\": " << rows << ",\n";
}
if (cols.empty() == false) {
result << " \"cols\": " << cols << ",\n";
}
if (memory.empty() == false) {
result << " \"memory_limit\": " << memory << ",\n";
}
if (threads.empty() == false) {
result << " \"threads\": " << threads << ",\n";
}
if (resultsField.empty() == false) {
result << " \"results_field\": \"" << resultsField << "\",\n";
}
if (categoricalFields.empty() == false) {
result << " \"categorical_fields\": [";
result << " \"" << categoricalFields[0] << "\"";
for (std::size_t i = 1; i < categoricalFields.size(); ++i) {
result << ", \"" << categoricalFields[i] << "\"";
}
result << " ],\n";
}
if (junk.empty() == false) {
result << " \"" << junk << "\": 4,\n";
}
result << " \"analysis\": {\n";
if (name.empty() == false) {
result << " \"name\": \"" << name << "\"";
}
if (parameters.empty() == false) {
result << ",\n \"parameters\": " << parameters << ",\n";
} else {
result << ",\n";
}
result << " \"disk_usage_allowed\": true \n}\n}";
return result.str();
};
LOG_DEBUG(<< "Valid input");
{
LOG_TRACE(<< jsonSpec("foo", "1000", "20", "100000", "2", "custom_ml",
{}, "outlier_detection"));
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "1000", "20", "100000", "2",
"custom_ml", {}, "outlier_detection")};
BOOST_REQUIRE_EQUAL(std::string{"foo"}, spec.jobId());
BOOST_REQUIRE_EQUAL(std::size_t{1000}, spec.numberRows());
BOOST_REQUIRE_EQUAL(std::size_t{20}, spec.numberColumns());
BOOST_REQUIRE_EQUAL(std::size_t{100000}, spec.memoryLimit());
BOOST_REQUIRE_EQUAL(std::size_t{2}, spec.numberThreads());
BOOST_REQUIRE_EQUAL(std::string("custom_ml"), spec.resultsField());
BOOST_TEST_REQUIRE(spec.categoricalFieldNames().empty());
}
{
LOG_TRACE(<< jsonSpec("bar", "1000", "20", "100000", "2", "custom_ml",
{"x", "y"}, "outlier_detection"));
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("bar", "1000", "20", "100000", "2", "custom_ml",
{"x", "y"}, "outlier_detection")};
BOOST_REQUIRE_EQUAL(std::string{"bar"}, spec.jobId());
BOOST_REQUIRE_EQUAL(std::size_t{1000}, spec.numberRows());
BOOST_REQUIRE_EQUAL(std::size_t{20}, spec.numberColumns());
BOOST_REQUIRE_EQUAL(std::size_t{100000}, spec.memoryLimit());
BOOST_REQUIRE_EQUAL(std::size_t{2}, spec.numberThreads());
BOOST_REQUIRE_EQUAL(std::string("custom_ml"), spec.resultsField());
BOOST_REQUIRE_EQUAL(std::string("[x, y]"),
core::CContainerPrinter::print(spec.categoricalFieldNames()));
}
LOG_DEBUG(<< "Bad input");
{
LOG_TRACE(<< jsonSpec("foo", "", "20", "100000", "2", "ml", {}, "outlier_detection"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(),
jsonSpec("foo", "", "20", "100000", "2", "ml", {}, "outlier_detection")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
{
LOG_TRACE(<< jsonSpec("foo", "1000", "", "100000", "2", "ml", {}, "outlier_detection"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "1000", "", "100000", "2", "ml",
{}, "outlier_detection")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
{
LOG_TRACE(<< jsonSpec("foo", "1000", "20", "", "2", "ml", {}, "outlier_detection"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(),
jsonSpec("foo", "1000", "20", "", "2", "ml", {}, "outlier_detection")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
{
LOG_TRACE(<< jsonSpec("foo", "1000", "20", "100000", "", "ml", {}, "outlier_detection"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "1000", "20", "100000", "", "ml",
{}, "outlier_detection")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
{
LOG_TRACE(<< jsonSpec("foo", "1000", "20", "100000", "2", "ml", {}, ""));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "1000", "20", "100000", "2", "ml", {}, "")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
{
LOG_TRACE(<< jsonSpec("foo", "-3", "20", "100000", "2", "ml", {}, "outlier_detection"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "-3", "20", "100000", "2", "ml",
{}, "outlier_detection")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
{
LOG_TRACE(<< jsonSpec("foo", "1000", "0", "100000", "2", "ml", {}, "outlier_detection"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "1000", "0", "100000", "2", "ml",
{}, "outlier_detection")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
{
LOG_TRACE(<< jsonSpec("foo", "1000", "20", "ZZ", "2", "ml", {}, "outlier_detection"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "1000", "20", "\"ZZ\"", "2",
"ml", {}, "outlier_detection")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
{
LOG_TRACE(<< jsonSpec("foo", "1000", "20", "100000", "-1", "ml", {}, "outlier_detection"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "1000", "20", "100000", "-1",
"ml", {}, "outlier_detection")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
{
LOG_TRACE(<< jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outl1ers"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(),
jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outl1ers")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
{
std::string jsonSpecStr{"{\n"
" \"job_id\": \"foo\",\n"
" \"rows\": 1000,\n"
" \"cols\": 20,\n"
" \"memory_limit\": 100000,\n"
" \"threads\": 2,\n"
" \"results_field\": \"ml\",\n"
" \"categorical_fields\": [ 2, 1 ],\n"
" \"analysis\": {\n"
" \"name\": \"outlier_detection\"\n"
" }\n"
"}"};
errors.clear();
api::CDataFrameAnalysisSpecification spec{runnerFactories(), jsonSpecStr};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
LOG_DEBUG(<< "Invalid number neighbours");
{
LOG_TRACE(<< jsonSpec("foo", "100", "20", "100000", "2", "ml", {},
"outlier_detection", "{\"n_neighbors\": -1}"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "100", "20", "100000", "2", "ml", {},
"outlier_detection", "{\"n_neighbors\": -1}")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
LOG_DEBUG(<< "Invalid method");
{
LOG_TRACE(<< jsonSpec("foo", "100", "20", "100000", "2", "ml", {},
"outlier_detection", "{\"method\": \"lofe\"}"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "100", "20", "100000", "2", "ml", {},
"outlier_detection", "{\"method\": \"lofe\"}")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
LOG_DEBUG(<< "Invalid feature influence");
{
LOG_TRACE(<< jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outlier_detection",
"{\"compute_feature_influence\": 1}"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(),
jsonSpec("foo", "100", "20", "100000", "2", "ml", {},
"outlier_detection", "{\"compute_feature_influence\": 1}")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
LOG_DEBUG(<< "Invalid feature influence");
{
LOG_TRACE(<< jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outlier_detection",
"{\"compute_feature_influences\": true}"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outlier_detection",
"{\"compute_feature_influences\": true}")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
LOG_DEBUG(<< "Extra junk");
{
LOG_TRACE(<< jsonSpec("foo", "1000", "2", "100000", "2", "ml", {},
"outlier_detection", "", "threeds"));
errors.clear();
api::CDataFrameAnalysisSpecification spec{
runnerFactories(), jsonSpec("foo", "1000", "2", "100000", "2", "ml",
{}, "outlier_detection", "", "threeds")};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
LOG_DEBUG(<< "Classification with numeric target");
{
errors.clear();
std::string parameters{"{\"dependent_variable\": \"class\"}"};
api::CDataFrameAnalysisSpecification spec{
api::CDataFrameAnalysisSpecificationJsonWriter::jsonString(
"testJob", 10000, 5, 100000000, 1, "", {}, true,
test::CTestTmpDir::tmpDir(), "", "classification", parameters)};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
LOG_DEBUG(<< "Regression with categorical target");
{
errors.clear();
std::string parameters{"{\"dependent_variable\": \"value\"}"};
api::CDataFrameAnalysisSpecification spec{
api::CDataFrameAnalysisSpecificationJsonWriter::jsonString(
"testJob", 10000, 5, 100000000, 1, "", {"value"}, true,
test::CTestTmpDir::tmpDir(), "", "regression", parameters)};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
LOG_DEBUG(<< "Missing field value is numeric");
{
errors.clear();
std::string parameters{"{\"dependent_variable\": \"value\"}"};
api::CDataFrameAnalysisSpecification spec{
api::CDataFrameAnalysisSpecificationJsonWriter::jsonString(
"testJob", 10000, 5, 100000000, 1, "42", {}, true,
test::CTestTmpDir::tmpDir(), "", "regression", parameters)};
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
LOG_DEBUG(<< "Invalid number of class weights");
{
errors.clear();
test::CDataFrameAnalysisSpecificationFactory specFactory;
auto spec = specFactory.rows(5)
.predictionCategoricalFieldNames({"f1", "target"})
.numberClasses(2)
.classificationWeights({{"a", 0.1}, {"b", 0.4}, {"c", 0.5}})
.predictionSpec(test::CDataFrameAnalysisSpecificationFactory::classification(),
"target");
LOG_DEBUG(<< errors);
BOOST_TEST_REQUIRE(errors.size() > 0);
}
}
BOOST_AUTO_TEST_CASE(testRunAnalysis) {
// Check progress is monotonic and that it remains less than one until the end
// of the analysis.
auto testFactory = []() {
TRunnerFactoryUPtr factory{std::make_unique<CDataFrameMockAnalysisRunnerFactory>()};
TRunnerFactoryUPtrVec factories;
factories.push_back(std::move(factory));
return factories;
};
std::string jsonSpec = api::CDataFrameAnalysisSpecificationJsonWriter::jsonString(
"testJob", 100, 10, 1000, 1, "", {}, true, test::CTestTmpDir::tmpDir(),
"", "test", "");
for (std::size_t i = 0; i < 10; ++i) {
api::CDataFrameAnalysisSpecification spec{testFactory(), jsonSpec};
auto frameAndDirectory = core::makeMainStorageDataFrame(10);
auto frame = std::move(frameAndDirectory.first);
api::CDataFrameAnalysisRunner* runner{spec.runner()};
BOOST_TEST_REQUIRE(runner != nullptr);
runner->run(*frame);
double lastProgress{runner->instrumentation().progress()};
while (runner->instrumentation().finished() == false) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
LOG_TRACE(<< "progress = " << lastProgress);
BOOST_TEST_REQUIRE(runner->instrumentation().progress() >= lastProgress);
lastProgress = runner->instrumentation().progress();
BOOST_TEST_REQUIRE(runner->instrumentation().progress() <= 1.0);
}
LOG_TRACE(<< "final progress = " << lastProgress);
BOOST_REQUIRE_EQUAL(1.0, runner->instrumentation().progress());
}
}
BOOST_AUTO_TEST_CASE(testTempDirDiskUsage) {
std::vector<std::string> errors;
std::mutex errorsMutex;
auto errorHandler = [&errors, &errorsMutex](std::string error) {
std::lock_guard<std::mutex> lock{errorsMutex};
errors.push_back(error);
};
core::CLogger::CScopeSetFatalErrorHandler scope{errorHandler};
// No temp dir given, disk usage allowed
{
errors.clear();
std::string jsonSpec{createSpecJsonForTempDirDiskUsageTest(false, true)};
api::CDataFrameAnalysisSpecification spec{jsonSpec};
// single error is registered that temp dir is empty
BOOST_REQUIRE_EQUAL(static_cast<std::size_t>(1), errors.size());
BOOST_TEST_REQUIRE(errors[0].find("Input error: temporary directory path should"
" be explicitly set if disk usage is allowed!") !=
std::string::npos);
}
// No temp dir given, no disk usage allowed
{
errors.clear();
std::string jsonSpec{createSpecJsonForTempDirDiskUsageTest(false, false)};
api::CDataFrameAnalysisSpecification spec{jsonSpec};
// no error should be registered
BOOST_REQUIRE_EQUAL(static_cast<std::size_t>(0), errors.size());
}
// Temp dir given and disk usage allowed
{
errors.clear();
std::string jsonSpec{createSpecJsonForTempDirDiskUsageTest(true, true)};
api::CDataFrameAnalysisSpecification spec{jsonSpec};
// no error should be registered
BOOST_REQUIRE_EQUAL(static_cast<std::size_t>(0), errors.size());
}
}
BOOST_AUTO_TEST_SUITE_END()