in core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp [128:310]
void ProcessorSplitMultilineLogStringNative::ProcessEvent(PipelineEventGroup& logGroup,
StringView logPath,
PipelineEventPtr&& e,
EventsContainer& newEvents,
int* inputLines,
int* unmatchLines) {
if (!IsSupportedEvent(e)) {
newEvents.emplace_back(std::move(e));
return;
}
LogEvent& sourceEvent = e.Cast<LogEvent>();
std::string errorMsg;
if (sourceEvent.Size() != 1) {
errorMsg = "log event fields cnt does not equal to 1";
} else if (!sourceEvent.HasContent(mSourceKey)) {
errorMsg = "log event does not have content key";
}
if (!errorMsg.empty()) {
newEvents.emplace_back(std::move(e));
LOG_ERROR(mContext->GetLogger(),
("unexpected error", errorMsg)("processor", sName)("config", mContext->GetConfigName()));
mContext->GetAlarm().SendAlarm(SPLIT_LOG_FAIL_ALARM,
"unexpected error: " + errorMsg + ".\tprocessor: " + sName
+ "\tconfig: " + mContext->GetConfigName(),
mContext->GetRegion(),
mContext->GetProjectName(),
mContext->GetConfigName(),
mContext->GetLogstoreName());
return;
}
StringView sourceVal = sourceEvent.GetContent(mSourceKey);
StringBuffer sourceKey = logGroup.GetSourceBuffer()->CopyString(mSourceKey);
std::string exception;
const char* multiStartIndex = nullptr;
bool isPartialLog = false;
if (!HasStartPattern() && !HasContinuePattern() && HasEndPattern()) {
// if only end pattern is given, then it will stick to this state
isPartialLog = true;
multiStartIndex = sourceVal.data();
}
size_t begin = 0;
while (begin < sourceVal.size()) {
StringView content = GetNextLine(sourceVal, begin);
bool isLastLog = begin + content.size() == sourceVal.size();
++(*inputLines);
if (!isPartialLog) {
// it is impossible to enter this state if only end pattern is given
boost::regex regex;
if (HasStartPattern()) {
regex = GetStartPatternReg();
} else {
regex = GetContinuePatternReg();
}
if (BoostRegexSearch(content.data(), content.size(), regex, exception)) {
multiStartIndex = content.data();
isPartialLog = true;
} else if (HasEndPattern() && !HasStartPattern() && HasContinuePattern()
&& BoostRegexSearch(content.data(), content.size(), GetEndPatternReg(), exception)) {
// case: continue + end
CreateNewEvent(content, isLastLog, sourceKey, sourceEvent, logGroup, newEvents);
multiStartIndex = content.data() + content.size() + 1;
ADD_COUNTER(mMatchedEventsTotal, 1);
} else {
HandleUnmatchLogs(
content, isLastLog, sourceKey, sourceEvent, logGroup, newEvents, logPath, unmatchLines);
}
} else {
// case: start + continue or continue + end
if (HasContinuePattern()
&& BoostRegexSearch(content.data(), content.size(), GetContinuePatternReg(), exception)) {
begin += content.size() + 1;
continue;
}
if (HasEndPattern()) {
// case: start + end or continue + end or end
if (HasContinuePattern()) {
// current line is not matched against the continue pattern, so the end pattern will decide
// if the current log is a match or not
if (BoostRegexSearch(content.data(), content.size(), GetEndPatternReg(), exception)) {
CreateNewEvent(StringView(multiStartIndex, content.data() + content.size() - multiStartIndex),
isLastLog,
sourceKey,
sourceEvent,
logGroup,
newEvents);
ADD_COUNTER(mMatchedEventsTotal, 1);
} else {
HandleUnmatchLogs(
StringView(multiStartIndex, content.data() + content.size() - multiStartIndex),
isLastLog,
sourceKey,
sourceEvent,
logGroup,
newEvents,
logPath,
unmatchLines);
}
isPartialLog = false;
} else {
// case: start + end or end
if (BoostRegexSearch(content.data(), content.size(), GetEndPatternReg(), exception)) {
CreateNewEvent(StringView(multiStartIndex, content.data() + content.size() - multiStartIndex),
isLastLog,
sourceKey,
sourceEvent,
logGroup,
newEvents);
if (HasStartPattern()) {
isPartialLog = false;
} else {
multiStartIndex = content.data() + content.size() + 1;
}
ADD_COUNTER(mMatchedEventsTotal, 1);
// if only end pattern is given, start another log automatically
}
// no continue pattern given, and the current line in not matched against the end pattern,
// so wait for the next line
}
} else {
if (!HasContinuePattern()) {
// case: start
if (BoostRegexSearch(content.data(), content.size(), GetStartPatternReg(), exception)) {
CreateNewEvent(StringView(multiStartIndex, content.data() - 1 - multiStartIndex),
isLastLog,
sourceKey,
sourceEvent,
logGroup,
newEvents);
multiStartIndex = content.data();
ADD_COUNTER(mMatchedEventsTotal, 1);
}
} else {
// case: start + continue
// continue pattern is given, but current line is not matched against the continue pattern
CreateNewEvent(StringView(multiStartIndex, content.data() - 1 - multiStartIndex),
isLastLog,
sourceKey,
sourceEvent,
logGroup,
newEvents);
ADD_COUNTER(mMatchedEventsTotal, 1);
if (!BoostRegexSearch(content.data(), content.size(), GetStartPatternReg(), exception)) {
// when no end pattern is given, the only chance to enter unmatched state is when both
// start and continue pattern are given, and the current line is not matched against the
// start pattern
HandleUnmatchLogs(
content, isLastLog, sourceKey, sourceEvent, logGroup, newEvents, logPath, unmatchLines);
isPartialLog = false;
} else {
multiStartIndex = content.data();
}
}
}
}
begin += content.size() + 1;
}
// when in unmatched state, the unmatched log is handled one by one, so there is no need for additional handle
// here
if (isPartialLog && multiStartIndex - sourceVal.data() < static_cast<int64_t>(sourceVal.size())) {
if (!HasEndPattern()) {
CreateNewEvent(StringView(multiStartIndex, sourceVal.data() + sourceVal.size() - multiStartIndex),
true,
sourceKey,
sourceEvent,
logGroup,
newEvents);
ADD_COUNTER(mMatchedEventsTotal, 1);
} else {
HandleUnmatchLogs(StringView(multiStartIndex, sourceVal.data() + sourceVal.size() - multiStartIndex),
true,
sourceKey,
sourceEvent,
logGroup,
newEvents,
logPath,
unmatchLines);
}
}
}