in core/plugin/processor/inner/ProcessorMergeMultilineLogNative.cpp [161:324]
void ProcessorMergeMultilineLogNative::MergeLogsByRegex(PipelineEventGroup& logGroup) {
/*
Supported regex combination:
1. start
2. start + continue
3. start + end
4. continue + end
5. end
*/
auto& sourceEvents = logGroup.MutableEvents();
size_t begin = 0, newSize = 0;
std::vector<LogEvent*> events;
std::string exception;
bool isPartialLog = false;
StringView logPath = logGroup.GetMetadata(EventGroupMetaKey::LOG_FILE_PATH_RESOLVED);
if (mMultiline.GetStartPatternReg() == nullptr && mMultiline.GetContinuePatternReg() == nullptr
&& mMultiline.GetEndPatternReg() != nullptr) {
// if only end pattern is given, then it will stick to this state
isPartialLog = true;
}
for (size_t cur = 0; cur < sourceEvents.size(); ++cur) {
if (!IsSupportedEvent(sourceEvents[cur])) {
if (events.empty()) {
begin = cur;
}
for (size_t i = begin; i < sourceEvents.size(); ++i) {
sourceEvents[newSize++] = std::move(sourceEvents[i]);
}
sourceEvents.resize(newSize);
return;
}
LogEvent* sourceEvent = &sourceEvents[cur].Cast<LogEvent>();
if (sourceEvent->Empty()) {
continue;
}
if (!sourceEvent->HasContent(mSourceKey)) {
if (events.empty()) {
begin = cur;
}
for (size_t i = begin; i < sourceEvents.size(); ++i) {
sourceEvents[newSize++] = std::move(sourceEvents[i]);
}
sourceEvents.resize(newSize);
LOG_ERROR(mContext->GetLogger(),
("unexpected error", "Some events do not have the SourceKey.")("processor", sName)(
"SourceKey", mSourceKey)("config", mContext->GetConfigName()));
mContext->GetAlarm().SendAlarm(SPLIT_LOG_FAIL_ALARM,
"unexpected error: some events do not have the sourceKey.\tSourceKey: "
+ mSourceKey + "\tprocessor: " + sName
+ "\tconfig: " + mContext->GetConfigName(),
mContext->GetRegion(),
mContext->GetProjectName(),
mContext->GetConfigName(),
mContext->GetLogstoreName());
return;
}
StringView sourceVal = sourceEvent->GetContent(mSourceKey);
if (!isPartialLog) {
// it is impossible to enter this state if only end pattern is given
boost::regex regex;
if (mMultiline.GetStartPatternReg() != nullptr) {
regex = *mMultiline.GetStartPatternReg();
} else {
regex = *mMultiline.GetContinuePatternReg();
}
if (BoostRegexSearch(sourceVal.data(), sourceVal.size(), regex, exception)) {
events.emplace_back(sourceEvent);
begin = cur;
isPartialLog = true;
} else if (mMultiline.GetEndPatternReg() != nullptr && mMultiline.GetStartPatternReg() == nullptr
&& mMultiline.GetContinuePatternReg() != nullptr
&& BoostRegexSearch(
sourceVal.data(), sourceVal.size(), *mMultiline.GetEndPatternReg(), exception)) {
// case: continue + end
// current line is matched against the end pattern rather than the continue pattern
begin = cur;
ADD_COUNTER(mMergedEventsTotal, 1);
sourceEvents[newSize++] = std::move(sourceEvents[begin]);
} else {
HandleUnmatchLogs(sourceEvents, newSize, cur, cur, logPath);
}
} else {
// case: start + continue or continue + end
if (mMultiline.GetContinuePatternReg() != nullptr
&& BoostRegexSearch(
sourceVal.data(), sourceVal.size(), *mMultiline.GetContinuePatternReg(), exception)) {
events.emplace_back(sourceEvent);
continue;
}
if (mMultiline.GetEndPatternReg() != nullptr) {
// case: start + end or continue + end or end
events.emplace_back(sourceEvent);
if (mMultiline.GetContinuePatternReg() != nullptr) {
// current line is not matched against the continue pattern, so the end pattern will decide if
// the current log is a match or not
if (BoostRegexSearch(
sourceVal.data(), sourceVal.size(), *mMultiline.GetEndPatternReg(), exception)) {
MergeEvents(events, true);
sourceEvents[newSize++] = std::move(sourceEvents[begin]);
} else {
HandleUnmatchLogs(sourceEvents, newSize, begin, cur, logPath);
events.clear();
}
isPartialLog = false;
} else {
// case: start + end or end
if (BoostRegexSearch(
sourceVal.data(), sourceVal.size(), *mMultiline.GetEndPatternReg(), exception)) {
MergeEvents(events, true);
sourceEvents[newSize++] = std::move(sourceEvents[begin]);
if (mMultiline.GetStartPatternReg() != nullptr) {
isPartialLog = false;
} else {
// only end pattern is given, so start another log automatically
begin = cur + 1;
}
}
// no continue pattern given, and the current line in not matched against the end pattern, so
// wait for the next line
}
} else {
if (mMultiline.GetContinuePatternReg() == nullptr) {
// case: start
if (!BoostRegexSearch(
sourceVal.data(), sourceVal.size(), *mMultiline.GetStartPatternReg(), exception)) {
events.emplace_back(sourceEvent);
} else {
MergeEvents(events, true);
sourceEvents[newSize++] = std::move(sourceEvents[begin]);
begin = cur;
events.emplace_back(sourceEvent);
}
} else {
// case: start + continue
// continue pattern is given, but current line is not matched against the continue pattern
MergeEvents(events, true);
sourceEvents[newSize++] = std::move(sourceEvents[begin]);
if (!BoostRegexSearch(
sourceVal.data(), sourceVal.size(), *mMultiline.GetStartPatternReg(), exception)) {
// when no end pattern is given, the only chance to enter unmatched state is when both start
// and continue pattern are given, and the current line is not matched against the start
// pattern
HandleUnmatchLogs(sourceEvents, newSize, cur, cur, logPath);
isPartialLog = false;
} else {
begin = cur;
events.emplace_back(sourceEvent);
}
}
}
}
}
// when in unmatched state, the unmatched log is handled one by one, so there is no need for additional handle
// here
if (isPartialLog && begin < sourceEvents.size()) {
if (mMultiline.GetEndPatternReg() == nullptr) {
MergeEvents(events, true);
sourceEvents[newSize++] = std::move(sourceEvents[begin]);
} else {
HandleUnmatchLogs(sourceEvents, newSize, begin, sourceEvents.size() - 1, logPath);
}
}
sourceEvents.resize(newSize);
}