lib/api/unittest/CDataFrameAnalysisSpecificationTest.cc (396 lines of code) (raw):

/* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License * 2.0 and the following additional limitation. Functionality enabled by the * files subject to the Elastic License 2.0 may only be used in production when * invoked by an Elasticsearch process with a license key installed that permits * use of machine learning features. You may not use this file except in * compliance with the Elastic License 2.0 and the foregoing additional * limitation. */ #include <core/CContainerPrinter.h> #include <core/CDataFrame.h> #include <core/CLogger.h> #include <api/CDataFrameAnalysisRunner.h> #include <api/CDataFrameAnalysisSpecification.h> #include <api/CDataFrameAnalysisSpecificationJsonWriter.h> #include <api/CDataFrameOutliersRunner.h> #include <api/CDataFrameTrainBoostedTreeClassifierRunner.h> #include <api/CDataFrameTrainBoostedTreeRegressionRunner.h> #include <test/CDataFrameAnalysisSpecificationFactory.h> #include <test/CTestTmpDir.h> #include "CDataFrameMockAnalysisRunner.h" #include <boost/test/unit_test.hpp> #include <chrono> #include <memory> #include <string> #include <thread> #include <vector> BOOST_AUTO_TEST_SUITE(CDataFrameAnalysisSpecificationTest) using namespace ml; namespace { using TStrVec = std::vector<std::string>; using TRunnerFactoryUPtr = std::unique_ptr<api::CDataFrameAnalysisRunnerFactory>; using TRunnerFactoryUPtrVec = std::vector<TRunnerFactoryUPtr>; std::string createSpecJsonForTempDirDiskUsageTest(bool tempDirPathSet, bool diskUsageAllowed) { std::string tempDir = tempDirPathSet ? test::CTestTmpDir::tmpDir() : ""; return api::CDataFrameAnalysisSpecificationJsonWriter::jsonString( "testJob", 100, 3, 500000, 1, "", {}, diskUsageAllowed, tempDir, "", "outlier_detection", ""); } } BOOST_AUTO_TEST_CASE(testCreate) { // This test focuses on checking the validation code we apply to the object // rather than the JSON parsing so we don't bother with random fuzzing of the // input string and simply check validation for each field. TStrVec errors; auto errorHandler = [&errors](std::string error) { errors.push_back(error); }; core::CLogger::CScopeSetFatalErrorHandler scope{errorHandler}; auto runnerFactories = []() { TRunnerFactoryUPtr outliers{std::make_unique<api::CDataFrameOutliersRunnerFactory>()}; TRunnerFactoryUPtr regression{ std::make_unique<api::CDataFrameTrainBoostedTreeRegressionRunnerFactory>()}; TRunnerFactoryUPtr classification{ std::make_unique<api::CDataFrameTrainBoostedTreeClassifierRunnerFactory>()}; TRunnerFactoryUPtrVec factories; factories.push_back(std::move(outliers)); factories.push_back(std::move(regression)); factories.push_back(std::move(classification)); return factories; }; auto jsonSpec = [](const std::string& jobId, const std::string& rows, const std::string& cols, const std::string& memory, const std::string& threads, const std::string& resultsField, const TStrVec& categoricalFields, const std::string& name, const std::string& parameters = "", const std::string& junk = "") { std::ostringstream result; result << "{\n"; if (jobId.empty() == false) { result << " \"job_id\": \"" << jobId << "\",\n"; } if (rows.empty() == false) { result << " \"rows\": " << rows << ",\n"; } if (cols.empty() == false) { result << " \"cols\": " << cols << ",\n"; } if (memory.empty() == false) { result << " \"memory_limit\": " << memory << ",\n"; } if (threads.empty() == false) { result << " \"threads\": " << threads << ",\n"; } if (resultsField.empty() == false) { result << " \"results_field\": \"" << resultsField << "\",\n"; } if (categoricalFields.empty() == false) { result << " \"categorical_fields\": ["; result << " \"" << categoricalFields[0] << "\""; for (std::size_t i = 1; i < categoricalFields.size(); ++i) { result << ", \"" << categoricalFields[i] << "\""; } result << " ],\n"; } if (junk.empty() == false) { result << " \"" << junk << "\": 4,\n"; } result << " \"analysis\": {\n"; if (name.empty() == false) { result << " \"name\": \"" << name << "\""; } if (parameters.empty() == false) { result << ",\n \"parameters\": " << parameters << ",\n"; } else { result << ",\n"; } result << " \"disk_usage_allowed\": true \n}\n}"; return result.str(); }; LOG_DEBUG(<< "Valid input"); { LOG_TRACE(<< jsonSpec("foo", "1000", "20", "100000", "2", "custom_ml", {}, "outlier_detection")); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "1000", "20", "100000", "2", "custom_ml", {}, "outlier_detection")}; BOOST_REQUIRE_EQUAL(std::string{"foo"}, spec.jobId()); BOOST_REQUIRE_EQUAL(std::size_t{1000}, spec.numberRows()); BOOST_REQUIRE_EQUAL(std::size_t{20}, spec.numberColumns()); BOOST_REQUIRE_EQUAL(std::size_t{100000}, spec.memoryLimit()); BOOST_REQUIRE_EQUAL(std::size_t{2}, spec.numberThreads()); BOOST_REQUIRE_EQUAL(std::string("custom_ml"), spec.resultsField()); BOOST_TEST_REQUIRE(spec.categoricalFieldNames().empty()); } { LOG_TRACE(<< jsonSpec("bar", "1000", "20", "100000", "2", "custom_ml", {"x", "y"}, "outlier_detection")); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("bar", "1000", "20", "100000", "2", "custom_ml", {"x", "y"}, "outlier_detection")}; BOOST_REQUIRE_EQUAL(std::string{"bar"}, spec.jobId()); BOOST_REQUIRE_EQUAL(std::size_t{1000}, spec.numberRows()); BOOST_REQUIRE_EQUAL(std::size_t{20}, spec.numberColumns()); BOOST_REQUIRE_EQUAL(std::size_t{100000}, spec.memoryLimit()); BOOST_REQUIRE_EQUAL(std::size_t{2}, spec.numberThreads()); BOOST_REQUIRE_EQUAL(std::string("custom_ml"), spec.resultsField()); BOOST_REQUIRE_EQUAL(std::string("[x, y]"), core::CContainerPrinter::print(spec.categoricalFieldNames())); } LOG_DEBUG(<< "Bad input"); { LOG_TRACE(<< jsonSpec("foo", "", "20", "100000", "2", "ml", {}, "outlier_detection")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "", "20", "100000", "2", "ml", {}, "outlier_detection")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } { LOG_TRACE(<< jsonSpec("foo", "1000", "", "100000", "2", "ml", {}, "outlier_detection")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "1000", "", "100000", "2", "ml", {}, "outlier_detection")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } { LOG_TRACE(<< jsonSpec("foo", "1000", "20", "", "2", "ml", {}, "outlier_detection")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "1000", "20", "", "2", "ml", {}, "outlier_detection")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } { LOG_TRACE(<< jsonSpec("foo", "1000", "20", "100000", "", "ml", {}, "outlier_detection")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "1000", "20", "100000", "", "ml", {}, "outlier_detection")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } { LOG_TRACE(<< jsonSpec("foo", "1000", "20", "100000", "2", "ml", {}, "")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "1000", "20", "100000", "2", "ml", {}, "")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } { LOG_TRACE(<< jsonSpec("foo", "-3", "20", "100000", "2", "ml", {}, "outlier_detection")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "-3", "20", "100000", "2", "ml", {}, "outlier_detection")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } { LOG_TRACE(<< jsonSpec("foo", "1000", "0", "100000", "2", "ml", {}, "outlier_detection")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "1000", "0", "100000", "2", "ml", {}, "outlier_detection")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } { LOG_TRACE(<< jsonSpec("foo", "1000", "20", "ZZ", "2", "ml", {}, "outlier_detection")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "1000", "20", "\"ZZ\"", "2", "ml", {}, "outlier_detection")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } { LOG_TRACE(<< jsonSpec("foo", "1000", "20", "100000", "-1", "ml", {}, "outlier_detection")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "1000", "20", "100000", "-1", "ml", {}, "outlier_detection")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } { LOG_TRACE(<< jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outl1ers")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outl1ers")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } { std::string jsonSpecStr{"{\n" " \"job_id\": \"foo\",\n" " \"rows\": 1000,\n" " \"cols\": 20,\n" " \"memory_limit\": 100000,\n" " \"threads\": 2,\n" " \"results_field\": \"ml\",\n" " \"categorical_fields\": [ 2, 1 ],\n" " \"analysis\": {\n" " \"name\": \"outlier_detection\"\n" " }\n" "}"}; errors.clear(); api::CDataFrameAnalysisSpecification spec{runnerFactories(), jsonSpecStr}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } LOG_DEBUG(<< "Invalid number neighbours"); { LOG_TRACE(<< jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outlier_detection", "{\"n_neighbors\": -1}")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outlier_detection", "{\"n_neighbors\": -1}")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } LOG_DEBUG(<< "Invalid method"); { LOG_TRACE(<< jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outlier_detection", "{\"method\": \"lofe\"}")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outlier_detection", "{\"method\": \"lofe\"}")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } LOG_DEBUG(<< "Invalid feature influence"); { LOG_TRACE(<< jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outlier_detection", "{\"compute_feature_influence\": 1}")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outlier_detection", "{\"compute_feature_influence\": 1}")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } LOG_DEBUG(<< "Invalid feature influence"); { LOG_TRACE(<< jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outlier_detection", "{\"compute_feature_influences\": true}")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "100", "20", "100000", "2", "ml", {}, "outlier_detection", "{\"compute_feature_influences\": true}")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } LOG_DEBUG(<< "Extra junk"); { LOG_TRACE(<< jsonSpec("foo", "1000", "2", "100000", "2", "ml", {}, "outlier_detection", "", "threeds")); errors.clear(); api::CDataFrameAnalysisSpecification spec{ runnerFactories(), jsonSpec("foo", "1000", "2", "100000", "2", "ml", {}, "outlier_detection", "", "threeds")}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } LOG_DEBUG(<< "Classification with numeric target"); { errors.clear(); std::string parameters{"{\"dependent_variable\": \"class\"}"}; api::CDataFrameAnalysisSpecification spec{ api::CDataFrameAnalysisSpecificationJsonWriter::jsonString( "testJob", 10000, 5, 100000000, 1, "", {}, true, test::CTestTmpDir::tmpDir(), "", "classification", parameters)}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } LOG_DEBUG(<< "Regression with categorical target"); { errors.clear(); std::string parameters{"{\"dependent_variable\": \"value\"}"}; api::CDataFrameAnalysisSpecification spec{ api::CDataFrameAnalysisSpecificationJsonWriter::jsonString( "testJob", 10000, 5, 100000000, 1, "", {"value"}, true, test::CTestTmpDir::tmpDir(), "", "regression", parameters)}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } LOG_DEBUG(<< "Missing field value is numeric"); { errors.clear(); std::string parameters{"{\"dependent_variable\": \"value\"}"}; api::CDataFrameAnalysisSpecification spec{ api::CDataFrameAnalysisSpecificationJsonWriter::jsonString( "testJob", 10000, 5, 100000000, 1, "42", {}, true, test::CTestTmpDir::tmpDir(), "", "regression", parameters)}; LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } LOG_DEBUG(<< "Invalid number of class weights"); { errors.clear(); test::CDataFrameAnalysisSpecificationFactory specFactory; auto spec = specFactory.rows(5) .predictionCategoricalFieldNames({"f1", "target"}) .numberClasses(2) .classificationWeights({{"a", 0.1}, {"b", 0.4}, {"c", 0.5}}) .predictionSpec(test::CDataFrameAnalysisSpecificationFactory::classification(), "target"); LOG_DEBUG(<< errors); BOOST_TEST_REQUIRE(errors.size() > 0); } } BOOST_AUTO_TEST_CASE(testRunAnalysis) { // Check progress is monotonic and that it remains less than one until the end // of the analysis. auto testFactory = []() { TRunnerFactoryUPtr factory{std::make_unique<CDataFrameMockAnalysisRunnerFactory>()}; TRunnerFactoryUPtrVec factories; factories.push_back(std::move(factory)); return factories; }; std::string jsonSpec = api::CDataFrameAnalysisSpecificationJsonWriter::jsonString( "testJob", 100, 10, 1000, 1, "", {}, true, test::CTestTmpDir::tmpDir(), "", "test", ""); for (std::size_t i = 0; i < 10; ++i) { api::CDataFrameAnalysisSpecification spec{testFactory(), jsonSpec}; auto frameAndDirectory = core::makeMainStorageDataFrame(10); auto frame = std::move(frameAndDirectory.first); api::CDataFrameAnalysisRunner* runner{spec.runner()}; BOOST_TEST_REQUIRE(runner != nullptr); runner->run(*frame); double lastProgress{runner->instrumentation().progress()}; while (runner->instrumentation().finished() == false) { std::this_thread::sleep_for(std::chrono::milliseconds(20)); LOG_TRACE(<< "progress = " << lastProgress); BOOST_TEST_REQUIRE(runner->instrumentation().progress() >= lastProgress); lastProgress = runner->instrumentation().progress(); BOOST_TEST_REQUIRE(runner->instrumentation().progress() <= 1.0); } LOG_TRACE(<< "final progress = " << lastProgress); BOOST_REQUIRE_EQUAL(1.0, runner->instrumentation().progress()); } } BOOST_AUTO_TEST_CASE(testTempDirDiskUsage) { std::vector<std::string> errors; std::mutex errorsMutex; auto errorHandler = [&errors, &errorsMutex](std::string error) { std::lock_guard<std::mutex> lock{errorsMutex}; errors.push_back(error); }; core::CLogger::CScopeSetFatalErrorHandler scope{errorHandler}; // No temp dir given, disk usage allowed { errors.clear(); std::string jsonSpec{createSpecJsonForTempDirDiskUsageTest(false, true)}; api::CDataFrameAnalysisSpecification spec{jsonSpec}; // single error is registered that temp dir is empty BOOST_REQUIRE_EQUAL(static_cast<std::size_t>(1), errors.size()); BOOST_TEST_REQUIRE(errors[0].find("Input error: temporary directory path should" " be explicitly set if disk usage is allowed!") != std::string::npos); } // No temp dir given, no disk usage allowed { errors.clear(); std::string jsonSpec{createSpecJsonForTempDirDiskUsageTest(false, false)}; api::CDataFrameAnalysisSpecification spec{jsonSpec}; // no error should be registered BOOST_REQUIRE_EQUAL(static_cast<std::size_t>(0), errors.size()); } // Temp dir given and disk usage allowed { errors.clear(); std::string jsonSpec{createSpecJsonForTempDirDiskUsageTest(true, true)}; api::CDataFrameAnalysisSpecification spec{jsonSpec}; // no error should be registered BOOST_REQUIRE_EQUAL(static_cast<std::size_t>(0), errors.size()); } } BOOST_AUTO_TEST_SUITE_END()