void CJsonOutputWriter::writeBucket()

in lib/api/CJsonOutputWriter.cc [375:507]


void CJsonOutputWriter::writeBucket(bool isInterim,
                                    core_t::TTime bucketTime,
                                    SBucketData& bucketData,
                                    std::uint64_t bucketProcessingTime) {
    // Write records
    if (!bucketData.s_DocumentsToWrite.empty()) {
        // Sort the results so they are grouped by detector and
        // ordered by probability
        std::sort(bucketData.s_DocumentsToWrite.begin(),
                  bucketData.s_DocumentsToWrite.end(), DETECTOR_PROBABILITY_LESS);

        m_Writer.onObjectBegin();
        m_Writer.onKey(RECORDS);
        m_Writer.onArrayBegin();

        // Iterate over the different detectors that we have results for
        for (TDocumentWeakPtrIntPrVecItr detectorIter =
                 bucketData.s_DocumentsToWrite.begin();
             detectorIter != bucketData.s_DocumentsToWrite.end(); ++detectorIter) {
            // Write the document, adding some extra fields as we go
            int detectorIndex = detectorIter->second;
            TDocumentWeakPtr weakDoc = detectorIter->first;
            TDocumentPtr docPtr = weakDoc.lock();
            if (!docPtr) {
                LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
                continue;
            }

            m_Writer.addIntFieldToObj(DETECTOR_INDEX, detectorIndex, *docPtr);
            m_Writer.addIntFieldToObj(BUCKET_SPAN, bucketData.s_BucketSpan, *docPtr);
            m_Writer.addStringFieldCopyToObj(JOB_ID, m_JobId, *docPtr);
            m_Writer.addTimeFieldToObj(TIMESTAMP, bucketTime, *docPtr);

            if (isInterim) {
                m_Writer.addBoolFieldToObj(IS_INTERIM, isInterim, *docPtr);
            }
            m_Writer.write(*docPtr);
        }
        m_Writer.onArrayEnd();
        m_Writer.onObjectEnd();
    }

    // Write influencers
    if (!bucketData.s_InfluencerDocuments.empty()) {
        m_Writer.onObjectBegin();
        m_Writer.onKey(INFLUENCERS);
        m_Writer.onArrayBegin();
        for (TDocumentWeakPtrVecItr influencerIter =
                 bucketData.s_InfluencerDocuments.begin();
             influencerIter != bucketData.s_InfluencerDocuments.end(); ++influencerIter) {
            TDocumentWeakPtr weakDoc = *influencerIter;
            TDocumentPtr docPtr = weakDoc.lock();
            if (!docPtr) {
                LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
                continue;
            }

            m_Writer.addStringFieldCopyToObj(JOB_ID, m_JobId, *docPtr);
            m_Writer.addTimeFieldToObj(TIMESTAMP, bucketTime, *docPtr);
            if (isInterim) {
                m_Writer.addBoolFieldToObj(IS_INTERIM, isInterim, *docPtr);
            }
            m_Writer.addIntFieldToObj(BUCKET_SPAN, bucketData.s_BucketSpan, *docPtr);
            m_Writer.write(*docPtr);
        }
        m_Writer.onArrayEnd();
        m_Writer.onObjectEnd();
    }

    // Write bucket at the end, as some of its values need to iterate over records, etc.
    m_Writer.onObjectBegin();
    m_Writer.onKey(BUCKET);

    m_Writer.onObjectBegin();
    m_Writer.onKey(JOB_ID);
    m_Writer.onString(m_JobId);
    m_Writer.onKey(TIMESTAMP);
    m_Writer.onTime(bucketTime);

    m_Writer.onKey(ANOMALY_SCORE);
    m_Writer.onDouble(bucketData.s_MaxBucketInfluencerNormalizedAnomalyScore);
    m_Writer.onKey(INITIAL_SCORE);
    m_Writer.onDouble(bucketData.s_MaxBucketInfluencerNormalizedAnomalyScore);
    m_Writer.onKey(EVENT_COUNT);
    m_Writer.onUint64(bucketData.s_InputEventCount);
    if (isInterim) {
        m_Writer.onKey(IS_INTERIM);
        m_Writer.onBool(isInterim);
    }
    m_Writer.onKey(BUCKET_SPAN);
    m_Writer.onInt64(bucketData.s_BucketSpan);

    if (!bucketData.s_BucketInfluencerDocuments.empty()) {
        // Write the array of influencers
        m_Writer.onKey(BUCKET_INFLUENCERS);
        m_Writer.onArrayBegin();
        for (TDocumentWeakPtrVecItr influencerIter =
                 bucketData.s_BucketInfluencerDocuments.begin();
             influencerIter != bucketData.s_BucketInfluencerDocuments.end();
             ++influencerIter) {
            TDocumentWeakPtr weakDoc = *influencerIter;
            TDocumentPtr docPtr = weakDoc.lock();
            if (!docPtr) {
                LOG_ERROR(<< "Inconsistent program state. JSON document unavailable.");
                continue;
            }

            m_Writer.addStringFieldCopyToObj(JOB_ID, m_JobId, *docPtr);
            m_Writer.addTimeFieldToObj(TIMESTAMP, bucketTime, *docPtr);
            m_Writer.addIntFieldToObj(BUCKET_SPAN, bucketData.s_BucketSpan, *docPtr);
            if (isInterim) {
                m_Writer.addBoolFieldToObj(IS_INTERIM, isInterim, *docPtr);
            }
            m_Writer.write(*docPtr);
        }
        m_Writer.onArrayEnd();
    }

    m_Writer.onKey(PROCESSING_TIME);
    m_Writer.onUint64(bucketProcessingTime);

    if (bucketData.s_ScheduledEventDescriptions.empty() == false) {
        m_Writer.onKey(SCHEDULED_EVENTS);
        m_Writer.onArrayBegin();
        for (const auto& it : bucketData.s_ScheduledEventDescriptions) {
            m_Writer.onString(it);
        }
        m_Writer.onArrayEnd();
    }

    m_Writer.onObjectEnd();
    m_Writer.onObjectEnd();
}