void ProcessorSplitMultilineLogStringNative::ProcessEvent()

in core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp [128:310]


void ProcessorSplitMultilineLogStringNative::ProcessEvent(PipelineEventGroup& logGroup,
                                                          StringView logPath,
                                                          PipelineEventPtr&& e,
                                                          EventsContainer& newEvents,
                                                          int* inputLines,
                                                          int* unmatchLines) {
    if (!IsSupportedEvent(e)) {
        newEvents.emplace_back(std::move(e));
        return;
    }
    LogEvent& sourceEvent = e.Cast<LogEvent>();

    std::string errorMsg;
    if (sourceEvent.Size() != 1) {
        errorMsg = "log event fields cnt does not equal to 1";
    } else if (!sourceEvent.HasContent(mSourceKey)) {
        errorMsg = "log event does not have content key";
    }
    if (!errorMsg.empty()) {
        newEvents.emplace_back(std::move(e));
        LOG_ERROR(mContext->GetLogger(),
                  ("unexpected error", errorMsg)("processor", sName)("config", mContext->GetConfigName()));
        mContext->GetAlarm().SendAlarm(SPLIT_LOG_FAIL_ALARM,
                                       "unexpected error: " + errorMsg + ".\tprocessor: " + sName
                                           + "\tconfig: " + mContext->GetConfigName(),
                                       mContext->GetRegion(),
                                       mContext->GetProjectName(),
                                       mContext->GetConfigName(),
                                       mContext->GetLogstoreName());
        return;
    }

    StringView sourceVal = sourceEvent.GetContent(mSourceKey);
    StringBuffer sourceKey = logGroup.GetSourceBuffer()->CopyString(mSourceKey);

    std::string exception;
    const char* multiStartIndex = nullptr;
    bool isPartialLog = false;
    if (!HasStartPattern() && !HasContinuePattern() && HasEndPattern()) {
        // if only end pattern is given, then it will stick to this state
        isPartialLog = true;
        multiStartIndex = sourceVal.data();
    }

    size_t begin = 0;
    while (begin < sourceVal.size()) {
        StringView content = GetNextLine(sourceVal, begin);
        bool isLastLog = begin + content.size() == sourceVal.size();
        ++(*inputLines);
        if (!isPartialLog) {
            // it is impossible to enter this state if only end pattern is given
            boost::regex regex;
            if (HasStartPattern()) {
                regex = GetStartPatternReg();
            } else {
                regex = GetContinuePatternReg();
            }
            if (BoostRegexSearch(content.data(), content.size(), regex, exception)) {
                multiStartIndex = content.data();
                isPartialLog = true;
            } else if (HasEndPattern() && !HasStartPattern() && HasContinuePattern()
                       && BoostRegexSearch(content.data(), content.size(), GetEndPatternReg(), exception)) {
                // case: continue + end
                CreateNewEvent(content, isLastLog, sourceKey, sourceEvent, logGroup, newEvents);
                multiStartIndex = content.data() + content.size() + 1;
                ADD_COUNTER(mMatchedEventsTotal, 1);
            } else {
                HandleUnmatchLogs(
                    content, isLastLog, sourceKey, sourceEvent, logGroup, newEvents, logPath, unmatchLines);
            }
        } else {
            // case: start + continue or continue + end
            if (HasContinuePattern()
                && BoostRegexSearch(content.data(), content.size(), GetContinuePatternReg(), exception)) {
                begin += content.size() + 1;
                continue;
            }
            if (HasEndPattern()) {
                // case: start + end or continue + end or end
                if (HasContinuePattern()) {
                    // current line is not matched against the continue pattern, so the end pattern will decide
                    // if the current log is a match or not
                    if (BoostRegexSearch(content.data(), content.size(), GetEndPatternReg(), exception)) {
                        CreateNewEvent(StringView(multiStartIndex, content.data() + content.size() - multiStartIndex),
                                       isLastLog,
                                       sourceKey,
                                       sourceEvent,
                                       logGroup,
                                       newEvents);
                        ADD_COUNTER(mMatchedEventsTotal, 1);
                    } else {
                        HandleUnmatchLogs(
                            StringView(multiStartIndex, content.data() + content.size() - multiStartIndex),
                            isLastLog,
                            sourceKey,
                            sourceEvent,
                            logGroup,
                            newEvents,
                            logPath,
                            unmatchLines);
                    }
                    isPartialLog = false;
                } else {
                    // case: start + end or end
                    if (BoostRegexSearch(content.data(), content.size(), GetEndPatternReg(), exception)) {
                        CreateNewEvent(StringView(multiStartIndex, content.data() + content.size() - multiStartIndex),
                                       isLastLog,
                                       sourceKey,
                                       sourceEvent,
                                       logGroup,
                                       newEvents);
                        if (HasStartPattern()) {
                            isPartialLog = false;
                        } else {
                            multiStartIndex = content.data() + content.size() + 1;
                        }
                        ADD_COUNTER(mMatchedEventsTotal, 1);
                        // if only end pattern is given, start another log automatically
                    }
                    // no continue pattern given, and the current line in not matched against the end pattern,
                    // so wait for the next line
                }
            } else {
                if (!HasContinuePattern()) {
                    // case: start
                    if (BoostRegexSearch(content.data(), content.size(), GetStartPatternReg(), exception)) {
                        CreateNewEvent(StringView(multiStartIndex, content.data() - 1 - multiStartIndex),
                                       isLastLog,
                                       sourceKey,
                                       sourceEvent,
                                       logGroup,
                                       newEvents);
                        multiStartIndex = content.data();
                        ADD_COUNTER(mMatchedEventsTotal, 1);
                    }
                } else {
                    // case: start + continue
                    // continue pattern is given, but current line is not matched against the continue pattern
                    CreateNewEvent(StringView(multiStartIndex, content.data() - 1 - multiStartIndex),
                                   isLastLog,
                                   sourceKey,
                                   sourceEvent,
                                   logGroup,
                                   newEvents);
                    ADD_COUNTER(mMatchedEventsTotal, 1);
                    if (!BoostRegexSearch(content.data(), content.size(), GetStartPatternReg(), exception)) {
                        // when no end pattern is given, the only chance to enter unmatched state is when both
                        // start and continue pattern are given, and the current line is not matched against the
                        // start pattern
                        HandleUnmatchLogs(
                            content, isLastLog, sourceKey, sourceEvent, logGroup, newEvents, logPath, unmatchLines);
                        isPartialLog = false;
                    } else {
                        multiStartIndex = content.data();
                    }
                }
            }
        }
        begin += content.size() + 1;
    }
    // when in unmatched state, the unmatched log is handled one by one, so there is no need for additional handle
    // here
    if (isPartialLog && multiStartIndex - sourceVal.data() < static_cast<int64_t>(sourceVal.size())) {
        if (!HasEndPattern()) {
            CreateNewEvent(StringView(multiStartIndex, sourceVal.data() + sourceVal.size() - multiStartIndex),
                           true,
                           sourceKey,
                           sourceEvent,
                           logGroup,
                           newEvents);
            ADD_COUNTER(mMatchedEventsTotal, 1);
        } else {
            HandleUnmatchLogs(StringView(multiStartIndex, sourceVal.data() + sourceVal.size() - multiStartIndex),
                              true,
                              sourceKey,
                              sourceEvent,
                              logGroup,
                              newEvents,
                              logPath,
                              unmatchLines);
        }
    }
}