include/model/CForecastDataSink.h (145 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#ifndef INCLUDED_ml_model_CForecastDataSink_h
#define INCLUDED_ml_model_CForecastDataSink_h
#include <core/CBoostJsonConcurrentLineWriter.h>
#include <core/CNonCopyable.h>
#include <core/CoreTypes.h>
#include <model/ImportExport.h>
#include <model/ModelTypes.h>
#include <model/SModelParams.h>
#include <boost/json.hpp>
#include <boost/unordered_set.hpp>
#include <cstdint>
#include <iosfwd>
#include <memory>
#include <string>
namespace json = boost::json;
namespace ml {
namespace core {
class CJsonOutputStreamWrapper;
}
namespace maths {
namespace common {
class CModel;
struct SErrorBar;
}
}
namespace model {
//! \brief
//! Sink for data created from forecasting
//!
//! NOTE: Except for push, this is a stub implementation and going
//! to change (e.g. the json writing should not happen in this class).
class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable {
public:
using TMathsModelPtr = std::shared_ptr<maths::common::CModel>;
using TStrUMap = boost::unordered_set<std::string>;
struct SForecastResultSeries;
//! \brief Wrapper which supports creating a forecast for a single
//! time series model.
class MODEL_EXPORT CForecastModelWrapper {
public:
CForecastModelWrapper(model_t::EFeature feature,
const std::string& byFieldValue,
TMathsModelPtr&& forecastModel,
core_t::TTime firstDataTime,
core_t::TTime lastDataTime);
CForecastModelWrapper(CForecastModelWrapper&& other) = default;
CForecastModelWrapper(const CForecastModelWrapper&) = delete;
CForecastModelWrapper& operator=(const CForecastModelWrapper&) = delete;
bool forecast(const SForecastResultSeries& series,
core_t::TTime startTime,
core_t::TTime endTime,
double boundsPercentile,
CForecastDataSink& sink,
std::string& message) const;
private:
model_t::EFeature m_Feature;
std::string m_ByFieldValue;
TMathsModelPtr m_ForecastModel;
core_t::TTime m_FirstDataTime;
core_t::TTime m_LastDataTime;
};
//! Everything that defines 1 series of forecasts
struct MODEL_EXPORT SForecastResultSeries {
SForecastResultSeries(const SModelParams& modelParams);
SForecastResultSeries(SForecastResultSeries&&) = default;
SForecastResultSeries(const SForecastResultSeries&) = delete;
SForecastResultSeries& operator=(const SForecastResultSeries&) = delete;
SModelParams s_ModelParams;
int s_DetectorIndex;
std::vector<CForecastModelWrapper> s_ToForecast;
std::string s_ToForecastPersisted;
std::string s_PartitionFieldName;
std::string s_PartitionFieldValue;
std::string s_ByFieldName;
double s_MinimumSeasonalVarianceScale;
};
//! \brief Data describing prerequisites prior predictions
struct MODEL_EXPORT SForecastModelPrerequisites {
std::size_t s_NumberOfModels;
std::size_t s_NumberOfForecastableModels;
std::size_t s_MemoryUsageForDetector;
bool s_IsPopulation;
bool s_IsSupportedFunction;
};
private:
static const std::string JOB_ID;
static const std::string FORECAST_ID;
static const std::string FORECAST_ALIAS;
static const std::string DETECTOR_INDEX;
static const std::string MODEL_FORECAST;
static const std::string MODEL_FORECAST_STATS;
static const std::string PARTITION_FIELD_NAME;
static const std::string PARTITION_FIELD_VALUE;
static const std::string FEATURE;
static const std::string BY_FIELD_NAME;
static const std::string BY_FIELD_VALUE;
static const std::string LOWER;
static const std::string UPPER;
static const std::string PREDICTION;
static const std::string BUCKET_SPAN;
static const std::string PROCESSED_RECORD_COUNT;
static const std::string CREATE_TIME;
static const std::string TIMESTAMP;
static const std::string START_TIME;
static const std::string END_TIME;
static const std::string EXPIRY_TIME;
static const std::string MEMORY_USAGE;
static const std::string MESSAGES;
static const std::string PROCESSING_TIME_MS;
static const std::string PROGRESS;
static const std::string STATUS;
public:
//! Create a DataSink instance
CForecastDataSink(const std::string& jobId,
const std::string& forecastId,
const std::string& forecastAlias,
core_t::TTime createTime,
core_t::TTime startTime,
core_t::TTime endTime,
core_t::TTime expiryTime,
size_t memoryUsage,
core::CJsonOutputStreamWrapper& outStream);
//! Push a forecast datapoint
//! Note: No forecasting for models with over field, therefore no over field
void push(const maths::common::SErrorBar errorBar,
const std::string& feature,
const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const std::string& byFieldName,
const std::string& byFieldValue,
int detectorIndex);
//! Write Statistics about the forecast, also marks the ending
void writeStats(const double progress,
uint64_t runtime,
const TStrUMap& messages,
bool successful = true);
//! Write a scheduled message to signal that validation was successful
void writeScheduledMessage();
//! Write an error message to signal a problem with forecasting
void writeErrorMessage(const std::string& message);
//! Write a message to signal that forecasting is complete
//!
//! This is used when exiting early but not as a result of an error
void writeFinalMessage(const std::string& message);
//! get the number of forecast records written
uint64_t numRecordsWritten() const;
private:
void writeCommonStatsFields(json::object& doc);
void push(bool flush, json::object& doc);
private:
//! The job ID
std::string m_JobId;
//! The forecast ID
std::string m_ForecastId;
//! The forecast alias
std::string m_ForecastAlias;
//! JSON line writer
core::CBoostJsonConcurrentLineWriter m_Writer;
//! count of how many records written
uint64_t m_NumRecordsWritten;
//! Forecast create time
core_t::TTime m_CreateTime;
//! Forecast start time
core_t::TTime m_StartTime;
//! Forecast end time
core_t::TTime m_EndTime;
//! Forecast expiry time
core_t::TTime m_ExpiryTime;
//! Forecast memory usage for models
size_t m_MemoryUsage;
};
} /* namespace model */
} /* namespace ml */
#endif /* INCLUDED_ml_model_CForecastDataSink_h */