void ProcessorMergeMultilineLogNative::MergeLogsByRegex()

in core/plugin/processor/inner/ProcessorMergeMultilineLogNative.cpp [161:324]


void ProcessorMergeMultilineLogNative::MergeLogsByRegex(PipelineEventGroup& logGroup) {
    /*
        Supported regex combination:
        1. start
        2. start + continue
        3. start + end
        4. continue + end
        5. end
    */
    auto& sourceEvents = logGroup.MutableEvents();
    size_t begin = 0, newSize = 0;
    std::vector<LogEvent*> events;
    std::string exception;
    bool isPartialLog = false;
    StringView logPath = logGroup.GetMetadata(EventGroupMetaKey::LOG_FILE_PATH_RESOLVED);
    if (mMultiline.GetStartPatternReg() == nullptr && mMultiline.GetContinuePatternReg() == nullptr
        && mMultiline.GetEndPatternReg() != nullptr) {
        // if only end pattern is given, then it will stick to this state
        isPartialLog = true;
    }
    for (size_t cur = 0; cur < sourceEvents.size(); ++cur) {
        if (!IsSupportedEvent(sourceEvents[cur])) {
            if (events.empty()) {
                begin = cur;
            }
            for (size_t i = begin; i < sourceEvents.size(); ++i) {
                sourceEvents[newSize++] = std::move(sourceEvents[i]);
            }
            sourceEvents.resize(newSize);
            return;
        }
        LogEvent* sourceEvent = &sourceEvents[cur].Cast<LogEvent>();
        if (sourceEvent->Empty()) {
            continue;
        }
        if (!sourceEvent->HasContent(mSourceKey)) {
            if (events.empty()) {
                begin = cur;
            }
            for (size_t i = begin; i < sourceEvents.size(); ++i) {
                sourceEvents[newSize++] = std::move(sourceEvents[i]);
            }
            sourceEvents.resize(newSize);
            LOG_ERROR(mContext->GetLogger(),
                      ("unexpected error", "Some events do not have the SourceKey.")("processor", sName)(
                          "SourceKey", mSourceKey)("config", mContext->GetConfigName()));
            mContext->GetAlarm().SendAlarm(SPLIT_LOG_FAIL_ALARM,
                                           "unexpected error: some events do not have the sourceKey.\tSourceKey: "
                                               + mSourceKey + "\tprocessor: " + sName
                                               + "\tconfig: " + mContext->GetConfigName(),
                                           mContext->GetRegion(),
                                           mContext->GetProjectName(),
                                           mContext->GetConfigName(),
                                           mContext->GetLogstoreName());
            return;
        }
        StringView sourceVal = sourceEvent->GetContent(mSourceKey);
        if (!isPartialLog) {
            // it is impossible to enter this state if only end pattern is given
            boost::regex regex;
            if (mMultiline.GetStartPatternReg() != nullptr) {
                regex = *mMultiline.GetStartPatternReg();
            } else {
                regex = *mMultiline.GetContinuePatternReg();
            }
            if (BoostRegexSearch(sourceVal.data(), sourceVal.size(), regex, exception)) {
                events.emplace_back(sourceEvent);
                begin = cur;
                isPartialLog = true;
            } else if (mMultiline.GetEndPatternReg() != nullptr && mMultiline.GetStartPatternReg() == nullptr
                       && mMultiline.GetContinuePatternReg() != nullptr
                       && BoostRegexSearch(
                           sourceVal.data(), sourceVal.size(), *mMultiline.GetEndPatternReg(), exception)) {
                // case: continue + end
                // current line is matched against the end pattern rather than the continue pattern
                begin = cur;
                ADD_COUNTER(mMergedEventsTotal, 1);
                sourceEvents[newSize++] = std::move(sourceEvents[begin]);
            } else {
                HandleUnmatchLogs(sourceEvents, newSize, cur, cur, logPath);
            }
        } else {
            // case: start + continue or continue + end
            if (mMultiline.GetContinuePatternReg() != nullptr
                && BoostRegexSearch(
                    sourceVal.data(), sourceVal.size(), *mMultiline.GetContinuePatternReg(), exception)) {
                events.emplace_back(sourceEvent);
                continue;
            }
            if (mMultiline.GetEndPatternReg() != nullptr) {
                // case: start + end or continue + end or end
                events.emplace_back(sourceEvent);
                if (mMultiline.GetContinuePatternReg() != nullptr) {
                    // current line is not matched against the continue pattern, so the end pattern will decide if
                    // the current log is a match or not
                    if (BoostRegexSearch(
                            sourceVal.data(), sourceVal.size(), *mMultiline.GetEndPatternReg(), exception)) {
                        MergeEvents(events, true);
                        sourceEvents[newSize++] = std::move(sourceEvents[begin]);
                    } else {
                        HandleUnmatchLogs(sourceEvents, newSize, begin, cur, logPath);
                        events.clear();
                    }
                    isPartialLog = false;
                } else {
                    // case: start + end or end
                    if (BoostRegexSearch(
                            sourceVal.data(), sourceVal.size(), *mMultiline.GetEndPatternReg(), exception)) {
                        MergeEvents(events, true);
                        sourceEvents[newSize++] = std::move(sourceEvents[begin]);
                        if (mMultiline.GetStartPatternReg() != nullptr) {
                            isPartialLog = false;
                        } else {
                            // only end pattern is given, so start another log automatically
                            begin = cur + 1;
                        }
                    }
                    // no continue pattern given, and the current line in not matched against the end pattern, so
                    // wait for the next line
                }
            } else {
                if (mMultiline.GetContinuePatternReg() == nullptr) {
                    // case: start
                    if (!BoostRegexSearch(
                            sourceVal.data(), sourceVal.size(), *mMultiline.GetStartPatternReg(), exception)) {
                        events.emplace_back(sourceEvent);
                    } else {
                        MergeEvents(events, true);
                        sourceEvents[newSize++] = std::move(sourceEvents[begin]);
                        begin = cur;
                        events.emplace_back(sourceEvent);
                    }
                } else {
                    // case: start + continue
                    // continue pattern is given, but current line is not matched against the continue pattern
                    MergeEvents(events, true);
                    sourceEvents[newSize++] = std::move(sourceEvents[begin]);
                    if (!BoostRegexSearch(
                            sourceVal.data(), sourceVal.size(), *mMultiline.GetStartPatternReg(), exception)) {
                        // when no end pattern is given, the only chance to enter unmatched state is when both start
                        // and continue pattern are given, and the current line is not matched against the start
                        // pattern
                        HandleUnmatchLogs(sourceEvents, newSize, cur, cur, logPath);
                        isPartialLog = false;
                    } else {
                        begin = cur;
                        events.emplace_back(sourceEvent);
                    }
                }
            }
        }
    }
    // when in unmatched state, the unmatched log is handled one by one, so there is no need for additional handle
    // here
    if (isPartialLog && begin < sourceEvents.size()) {
        if (mMultiline.GetEndPatternReg() == nullptr) {
            MergeEvents(events, true);
            sourceEvents[newSize++] = std::move(sourceEvents[begin]);
        } else {
            HandleUnmatchLogs(sourceEvents, newSize, begin, sourceEvents.size() - 1, logPath);
        }
    }
    sourceEvents.resize(newSize);
}