lib/model/CForecastDataSink.cc (206 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#include <model/CForecastDataSink.h>
#include <core/CLogger.h>
#include <core/CScopedBoostJsonPoolAllocator.h>
#include <maths/common/CIntegerTools.h>
#include <maths/common/CModel.h>
#include <vector>
namespace ml {
namespace model {
namespace {
using TStrVec = std::vector<std::string>;
// static strings
const std::string STATUS_SCHEDULED("scheduled");
const std::string STATUS_STARTED("started");
const std::string STATUS_FINISHED("finished");
const std::string STATUS_FAILED("failed");
} // unnamed
// JSON field names
const std::string CForecastDataSink::JOB_ID("job_id");
const std::string CForecastDataSink::DETECTOR_INDEX("detector_index");
const std::string CForecastDataSink::FORECAST_ID("forecast_id");
const std::string CForecastDataSink::FORECAST_ALIAS("forecast_alias");
const std::string CForecastDataSink::MODEL_FORECAST("model_forecast");
const std::string CForecastDataSink::MODEL_FORECAST_STATS("model_forecast_request_stats");
const std::string CForecastDataSink::PARTITION_FIELD_NAME("partition_field_name");
const std::string CForecastDataSink::PARTITION_FIELD_VALUE("partition_field_value");
const std::string CForecastDataSink::FEATURE("model_feature");
const std::string CForecastDataSink::BY_FIELD_NAME("by_field_name");
const std::string CForecastDataSink::BY_FIELD_VALUE("by_field_value");
const std::string CForecastDataSink::LOWER("forecast_lower");
const std::string CForecastDataSink::UPPER("forecast_upper");
const std::string CForecastDataSink::PREDICTION("forecast_prediction");
const std::string CForecastDataSink::BUCKET_SPAN("bucket_span");
const std::string CForecastDataSink::PROCESSED_RECORD_COUNT("processed_record_count");
const std::string CForecastDataSink::CREATE_TIME("forecast_create_timestamp");
const std::string CForecastDataSink::TIMESTAMP("timestamp");
const std::string CForecastDataSink::START_TIME("forecast_start_timestamp");
const std::string CForecastDataSink::END_TIME("forecast_end_timestamp");
const std::string CForecastDataSink::EXPIRY_TIME("forecast_expiry_timestamp");
const std::string CForecastDataSink::MEMORY_USAGE("forecast_memory_bytes");
const std::string CForecastDataSink::MESSAGES("forecast_messages");
const std::string CForecastDataSink::PROCESSING_TIME_MS("processing_time_ms");
const std::string CForecastDataSink::PROGRESS("forecast_progress");
const std::string CForecastDataSink::STATUS("forecast_status");
using TScopedAllocator = core::CScopedBoostJsonPoolAllocator<core::CBoostJsonConcurrentLineWriter>;
CForecastDataSink::CForecastModelWrapper::CForecastModelWrapper(model_t::EFeature feature,
const std::string& byFieldValue,
TMathsModelPtr&& forecastModel,
core_t::TTime firstDataTime,
core_t::TTime lastDataTime)
: m_Feature(feature), m_ByFieldValue(byFieldValue),
m_ForecastModel(std::move(forecastModel)), m_FirstDataTime{firstDataTime}, m_LastDataTime{lastDataTime} {
}
bool CForecastDataSink::CForecastModelWrapper::forecast(const SForecastResultSeries& series,
core_t::TTime startTime,
core_t::TTime endTime,
double boundsPercentile,
CForecastDataSink& sink,
std::string& message) const {
core_t::TTime bucketLength{m_ForecastModel->params().bucketLength()};
startTime = model_t::sampleTime(m_Feature, startTime, bucketLength);
endTime = model_t::sampleTime(m_Feature, endTime, bucketLength);
model_t::TDouble1VecDouble1VecPr support{model_t::support(m_Feature)};
return m_ForecastModel->forecast(
m_FirstDataTime, m_LastDataTime, startTime, endTime, boundsPercentile,
support.first, support.second,
std::bind(static_cast<void (CForecastDataSink::*)(
const maths::common::SErrorBar, const std::string&, const std::string&,
const std::string&, const std::string&, const std::string&, int)>(
&model::CForecastDataSink::push),
&sink, std::placeholders::_1, model_t::print(m_Feature),
series.s_PartitionFieldName, series.s_PartitionFieldValue,
series.s_ByFieldName, m_ByFieldValue, series.s_DetectorIndex),
message);
}
CForecastDataSink::SForecastResultSeries::SForecastResultSeries(const SModelParams& modelParams)
: s_ModelParams(modelParams), s_DetectorIndex(), s_ToForecastPersisted(),
s_ByFieldName(), s_MinimumSeasonalVarianceScale(0.0) {
}
CForecastDataSink::CForecastDataSink(const std::string& jobId,
const std::string& forecastId,
const std::string& forecastAlias,
core_t::TTime createTime,
core_t::TTime startTime,
core_t::TTime endTime,
core_t::TTime expiryTime,
size_t memoryUsage,
core::CJsonOutputStreamWrapper& outStream)
: m_JobId(jobId), m_ForecastId(forecastId), m_ForecastAlias(forecastAlias),
m_Writer(outStream), m_NumRecordsWritten(0), m_CreateTime(createTime),
m_StartTime(startTime), m_EndTime(endTime), m_ExpiryTime(expiryTime),
m_MemoryUsage(memoryUsage) {
m_MemoryUsage += m_Writer.getOutputMemoryAllocatorUsage();
}
void CForecastDataSink::writeStats(const double progress,
std::uint64_t runtime,
const TStrUMap& messages,
bool successful) {
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
json::object doc = m_Writer.makeDoc();
this->writeCommonStatsFields(doc);
m_Writer.addUIntFieldToObj(MEMORY_USAGE, m_MemoryUsage, doc);
m_Writer.addUIntFieldToObj(PROCESSED_RECORD_COUNT, m_NumRecordsWritten, doc);
m_Writer.addDoubleFieldToObj(PROGRESS, progress, doc);
m_Writer.addUIntFieldToObj(PROCESSING_TIME_MS, runtime, doc);
json::value msgs = json::value_from(messages);
m_Writer.addStringArrayFieldToObj(MESSAGES, msgs.as_array(), doc);
if (progress < 1.0) {
m_Writer.addStringFieldReferenceToObj(STATUS, STATUS_STARTED, doc);
} else {
if (successful) {
m_Writer.addStringFieldReferenceToObj(STATUS, STATUS_FINISHED, doc);
} else {
m_Writer.addStringFieldReferenceToObj(STATUS, STATUS_FAILED, doc);
}
}
// only flush after the last record
this->push(progress == 1.0, doc);
}
void CForecastDataSink::writeScheduledMessage() {
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
json::object doc = m_Writer.makeDoc();
this->writeCommonStatsFields(doc);
m_Writer.addStringFieldReferenceToObj(STATUS, STATUS_SCHEDULED, doc);
this->push(true /*important, therefore flush*/, doc);
}
void CForecastDataSink::writeErrorMessage(const std::string& message) {
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
json::object doc = m_Writer.makeDoc();
this->writeCommonStatsFields(doc);
json::array messages{message};
m_Writer.addStringArrayFieldToObj(MESSAGES, messages, doc);
m_Writer.addStringFieldReferenceToObj(STATUS, STATUS_FAILED, doc);
this->push(true /*important, therefore flush*/, doc);
}
void CForecastDataSink::writeFinalMessage(const std::string& message) {
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
json::object doc = m_Writer.makeDoc();
this->writeCommonStatsFields(doc);
TStrVec messages{message};
m_Writer.addStringArrayFieldToObj(MESSAGES, messages, doc);
m_Writer.addDoubleFieldToObj(PROGRESS, 1.0, doc);
m_Writer.addStringFieldReferenceToObj(STATUS, STATUS_FINISHED, doc);
this->push(true /*important, therefore flush*/, doc);
}
void CForecastDataSink::writeCommonStatsFields(json::object& doc) {
m_Writer.addStringFieldReferenceToObj(JOB_ID, m_JobId, doc);
m_Writer.addStringFieldReferenceToObj(FORECAST_ID, m_ForecastId, doc);
if (m_ForecastAlias.empty() == false) {
m_Writer.addStringFieldReferenceToObj(FORECAST_ALIAS, m_ForecastAlias, doc);
}
m_Writer.addTimeFieldToObj(CREATE_TIME, m_CreateTime, doc);
m_Writer.addTimeFieldToObj(TIMESTAMP, m_StartTime, doc);
m_Writer.addTimeFieldToObj(START_TIME, m_StartTime, doc);
m_Writer.addTimeFieldToObj(END_TIME, m_EndTime, doc);
if (m_ExpiryTime != m_CreateTime) {
m_Writer.addTimeFieldToObj(EXPIRY_TIME, m_ExpiryTime, doc);
}
}
void CForecastDataSink::push(bool flush, json::object& doc) {
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
json::object wrapper = m_Writer.makeDoc();
m_Writer.addMember(MODEL_FORECAST_STATS, doc, wrapper);
m_Writer.write(wrapper);
if (flush) {
m_Writer.flush();
}
}
std::uint64_t CForecastDataSink::numRecordsWritten() const {
return m_NumRecordsWritten;
}
void CForecastDataSink::push(const maths::common::SErrorBar errorBar,
const std::string& feature,
const std::string& partitionFieldName,
const std::string& partitionFieldValue,
const std::string& byFieldName,
const std::string& byFieldValue,
int detectorIndex) {
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
++m_NumRecordsWritten;
json::object doc = m_Writer.makeDoc();
m_Writer.addStringFieldReferenceToObj(JOB_ID, m_JobId, doc);
m_Writer.addIntFieldToObj(DETECTOR_INDEX, detectorIndex, doc);
m_Writer.addStringFieldReferenceToObj(FORECAST_ID, m_ForecastId, doc);
if (m_ForecastAlias.empty() == false) {
m_Writer.addStringFieldReferenceToObj(FORECAST_ALIAS, m_ForecastAlias, doc);
}
m_Writer.addStringFieldCopyToObj(FEATURE, feature, doc, true);
// Time is in Java format - milliseconds since the epoch. Note this
// matches the Java notion of "bucket time" which is defined as the
// start of the bucket containing the forecast time.
core_t::TTime time{maths::common::CIntegerTools::floor(errorBar.s_Time,
errorBar.s_BucketLength)};
m_Writer.addTimeFieldToObj(TIMESTAMP, time, doc);
m_Writer.addIntFieldToObj(BUCKET_SPAN, errorBar.s_BucketLength, doc);
if (!partitionFieldName.empty()) {
m_Writer.addStringFieldCopyToObj(PARTITION_FIELD_NAME, partitionFieldName, doc);
m_Writer.addStringFieldCopyToObj(PARTITION_FIELD_VALUE,
partitionFieldValue, doc, true);
}
if (!byFieldName.empty()) {
m_Writer.addStringFieldCopyToObj(BY_FIELD_NAME, byFieldName, doc);
m_Writer.addStringFieldCopyToObj(BY_FIELD_VALUE, byFieldValue, doc, true);
}
m_Writer.addDoubleFieldToObj(LOWER, errorBar.s_LowerBound, doc);
m_Writer.addDoubleFieldToObj(UPPER, errorBar.s_UpperBound, doc);
m_Writer.addDoubleFieldToObj(PREDICTION, errorBar.s_Predicted, doc);
json::object wrapper = m_Writer.makeDoc();
m_Writer.addMember(MODEL_FORECAST, doc, wrapper);
m_Writer.write(wrapper);
}
} /* namespace model */
} /* namespace ml */