in lib/api/CJsonOutputWriter.cc [375:507]
void CJsonOutputWriter::writeBucket(bool isInterim,
core_t::TTime bucketTime,
SBucketData& bucketData,
std::uint64_t bucketProcessingTime) {
// Write records
if (!bucketData.s_DocumentsToWrite.empty()) {
// Sort the results so they are grouped by detector and
// ordered by probability
std::sort(bucketData.s_DocumentsToWrite.begin(),
bucketData.s_DocumentsToWrite.end(), DETECTOR_PROBABILITY_LESS);
m_Writer.onObjectBegin();
m_Writer.onKey(RECORDS);
m_Writer.onArrayBegin();
// Iterate over the different detectors that we have results for
for (TDocumentWeakPtrIntPrVecItr detectorIter =
bucketData.s_DocumentsToWrite.begin();
detectorIter != bucketData.s_DocumentsToWrite.end(); ++detectorIter) {
// Write the document, adding some extra fields as we go
int detectorIndex = detectorIter->second;
TDocumentWeakPtr weakDoc = detectorIter->first;
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
continue;
}
m_Writer.addIntFieldToObj(DETECTOR_INDEX, detectorIndex, *docPtr);
m_Writer.addIntFieldToObj(BUCKET_SPAN, bucketData.s_BucketSpan, *docPtr);
m_Writer.addStringFieldCopyToObj(JOB_ID, m_JobId, *docPtr);
m_Writer.addTimeFieldToObj(TIMESTAMP, bucketTime, *docPtr);
if (isInterim) {
m_Writer.addBoolFieldToObj(IS_INTERIM, isInterim, *docPtr);
}
m_Writer.write(*docPtr);
}
m_Writer.onArrayEnd();
m_Writer.onObjectEnd();
}
// Write influencers
if (!bucketData.s_InfluencerDocuments.empty()) {
m_Writer.onObjectBegin();
m_Writer.onKey(INFLUENCERS);
m_Writer.onArrayBegin();
for (TDocumentWeakPtrVecItr influencerIter =
bucketData.s_InfluencerDocuments.begin();
influencerIter != bucketData.s_InfluencerDocuments.end(); ++influencerIter) {
TDocumentWeakPtr weakDoc = *influencerIter;
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
continue;
}
m_Writer.addStringFieldCopyToObj(JOB_ID, m_JobId, *docPtr);
m_Writer.addTimeFieldToObj(TIMESTAMP, bucketTime, *docPtr);
if (isInterim) {
m_Writer.addBoolFieldToObj(IS_INTERIM, isInterim, *docPtr);
}
m_Writer.addIntFieldToObj(BUCKET_SPAN, bucketData.s_BucketSpan, *docPtr);
m_Writer.write(*docPtr);
}
m_Writer.onArrayEnd();
m_Writer.onObjectEnd();
}
// Write bucket at the end, as some of its values need to iterate over records, etc.
m_Writer.onObjectBegin();
m_Writer.onKey(BUCKET);
m_Writer.onObjectBegin();
m_Writer.onKey(JOB_ID);
m_Writer.onString(m_JobId);
m_Writer.onKey(TIMESTAMP);
m_Writer.onTime(bucketTime);
m_Writer.onKey(ANOMALY_SCORE);
m_Writer.onDouble(bucketData.s_MaxBucketInfluencerNormalizedAnomalyScore);
m_Writer.onKey(INITIAL_SCORE);
m_Writer.onDouble(bucketData.s_MaxBucketInfluencerNormalizedAnomalyScore);
m_Writer.onKey(EVENT_COUNT);
m_Writer.onUint64(bucketData.s_InputEventCount);
if (isInterim) {
m_Writer.onKey(IS_INTERIM);
m_Writer.onBool(isInterim);
}
m_Writer.onKey(BUCKET_SPAN);
m_Writer.onInt64(bucketData.s_BucketSpan);
if (!bucketData.s_BucketInfluencerDocuments.empty()) {
// Write the array of influencers
m_Writer.onKey(BUCKET_INFLUENCERS);
m_Writer.onArrayBegin();
for (TDocumentWeakPtrVecItr influencerIter =
bucketData.s_BucketInfluencerDocuments.begin();
influencerIter != bucketData.s_BucketInfluencerDocuments.end();
++influencerIter) {
TDocumentWeakPtr weakDoc = *influencerIter;
TDocumentPtr docPtr = weakDoc.lock();
if (!docPtr) {
LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
continue;
}
m_Writer.addStringFieldCopyToObj(JOB_ID, m_JobId, *docPtr);
m_Writer.addTimeFieldToObj(TIMESTAMP, bucketTime, *docPtr);
m_Writer.addIntFieldToObj(BUCKET_SPAN, bucketData.s_BucketSpan, *docPtr);
if (isInterim) {
m_Writer.addBoolFieldToObj(IS_INTERIM, isInterim, *docPtr);
}
m_Writer.write(*docPtr);
}
m_Writer.onArrayEnd();
}
m_Writer.onKey(PROCESSING_TIME);
m_Writer.onUint64(bucketProcessingTime);
if (bucketData.s_ScheduledEventDescriptions.empty() == false) {
m_Writer.onKey(SCHEDULED_EVENTS);
m_Writer.onArrayBegin();
for (const auto& it : bucketData.s_ScheduledEventDescriptions) {
m_Writer.onString(it);
}
m_Writer.onArrayEnd();
}
m_Writer.onObjectEnd();
m_Writer.onObjectEnd();
}