in core/plugin/processor/ProcessorParseDelimiterNative.cpp [206:364]
bool ProcessorParseDelimiterNative::ProcessEvent(const StringView& logPath,
PipelineEventPtr& e,
const GroupMetadata& metadata) {
if (!IsSupportedEvent(e)) {
ADD_COUNTER(mOutFailedEventsTotal, 1);
return true;
}
LogEvent& sourceEvent = e.Cast<LogEvent>();
if (!sourceEvent.HasContent(mSourceKey)) {
ADD_COUNTER(mOutKeyNotFoundEventsTotal, 1);
return true;
}
StringView buffer = sourceEvent.GetContent(mSourceKey);
int32_t endIdx = buffer.size();
if (endIdx == 0) {
ADD_COUNTER(mOutFailedEventsTotal, 1);
return true;
}
for (int32_t i = endIdx - 1; i >= 0; --i) {
if (buffer.data()[i] == ' ' || '\r' == buffer.data()[i])
endIdx = i;
else
break;
}
int32_t begIdx = 0;
for (int32_t i = 0; i < endIdx; ++i) {
if (buffer.data()[i] == ' ')
begIdx = i + 1;
else
break;
}
if (begIdx >= endIdx) {
ADD_COUNTER(mOutFailedEventsTotal, 1);
return true;
}
size_t reserveSize
= mOverflowedFieldsTreatment == OverflowedFieldsTreatment::EXTEND ? (mKeys.size() + 10) : (mKeys.size() + 1);
std::vector<StringView> columnValues;
std::vector<size_t> colBegIdxs;
std::vector<size_t> colLens;
bool parseSuccess = false;
size_t parsedColCount = 0;
bool useQuote = (mSeparator.size() == 1) && (mQuote != mSeparatorChar);
if (mKeys.size() > 0) {
if (useQuote) {
columnValues.reserve(reserveSize);
parseSuccess
= mDelimiterModeFsmParserPtr->ParseDelimiterLine(buffer, begIdx, endIdx, columnValues, sourceEvent);
// handle auto extend
if (!(mOverflowedFieldsTreatment == OverflowedFieldsTreatment::EXTEND)
&& columnValues.size() > mKeys.size()) {
int requiredLen = 0;
for (size_t i = mKeys.size(); i < columnValues.size(); ++i) {
requiredLen += 1 + columnValues[i].size();
}
StringBuffer sb = sourceEvent.GetSourceBuffer()->AllocateStringBuffer(requiredLen);
char* extraFields = sb.data;
for (size_t i = mKeys.size(); i < columnValues.size(); ++i) {
extraFields[0] = mSeparatorChar;
extraFields++;
memcpy(extraFields, columnValues[i].data(), columnValues[i].size());
extraFields += columnValues[i].size();
}
// remove extra fields
columnValues.resize(mKeys.size());
columnValues.push_back(StringView(sb.data, requiredLen));
}
parsedColCount = columnValues.size();
} else {
colBegIdxs.reserve(reserveSize);
colLens.reserve(reserveSize);
parseSuccess = SplitString(buffer.data(), begIdx, endIdx, colBegIdxs, colLens);
parsedColCount = colBegIdxs.size();
}
if (parseSuccess) {
if (parsedColCount <= 0 || (!mAllowingShortenedFields && parsedColCount < mKeys.size())) {
LOG_WARNING(
sLogger,
("parse delimiter log fail, keys count unmatch "
"columns count, parsed",
parsedColCount)("required", mKeys.size())("log", buffer)("project", GetContext().GetProjectName())(
"logstore", GetContext().GetLogstoreName())("file", logPath));
GetContext().GetAlarm().SendAlarm(PARSE_LOG_FAIL_ALARM,
std::string("keys count unmatch columns count :")
+ ToString(parsedColCount) + ", required:"
+ ToString(mKeys.size()) + ", logs:" + buffer.to_string(),
GetContext().GetRegion(),
GetContext().GetProjectName(),
GetContext().GetConfigName(),
GetContext().GetLogstoreName());
parseSuccess = false;
}
} else {
AlarmManager::GetInstance()->SendAlarm(PARSE_LOG_FAIL_ALARM,
std::string("parse delimiter log fail")
+ ", logs:" + buffer.to_string(),
GetContext().GetRegion(),
GetContext().GetProjectName(),
GetContext().GetConfigName(),
GetContext().GetLogstoreName());
parseSuccess = false;
}
} else {
AlarmManager::GetInstance()->SendAlarm(PARSE_LOG_FAIL_ALARM,
"no column keys defined",
GetContext().GetRegion(),
GetContext().GetProjectName(),
GetContext().GetConfigName(),
GetContext().GetLogstoreName());
LOG_WARNING(sLogger,
("parse delimiter log fail", "no column keys defined")("project", GetContext().GetProjectName())(
"logstore", GetContext().GetLogstoreName())("file", logPath));
parseSuccess = false;
}
if (parseSuccess) {
for (uint32_t idx = 0; idx < parsedColCount; idx++) {
if (mKeys.size() > idx) {
if (mExtractingPartialFields && mKeys[idx] == s_mDiscardedFieldKey) {
continue;
}
AddLog(mKeys[idx],
useQuote ? columnValues[idx] : StringView(buffer.data() + colBegIdxs[idx], colLens[idx]),
sourceEvent);
} else {
if (mExtractingPartialFields) {
continue;
}
std::string key = "__column" + ToString(idx) + "__";
StringBuffer sb = sourceEvent.GetSourceBuffer()->CopyString(key);
AddLog(StringView(sb.data, sb.size),
useQuote ? columnValues[idx] : StringView(buffer.data() + colBegIdxs[idx], colLens[idx]),
sourceEvent);
}
}
ADD_COUNTER(mOutSuccessfulEventsTotal, 1);
} else {
ADD_COUNTER(mOutFailedEventsTotal, 1);
}
if (!parseSuccess || !mSourceKeyOverwritten) {
sourceEvent.DelContent(mSourceKey);
}
if (mCommonParserOptions.ShouldAddSourceContent(parseSuccess)) {
AddLog(mCommonParserOptions.mRenamedSourceKey, buffer, sourceEvent, false);
}
if (mCommonParserOptions.ShouldAddLegacyUnmatchedRawLog(parseSuccess)) {
AddLog(mCommonParserOptions.legacyUnmatchedRawLogKey, buffer, sourceEvent, false);
}
if (mCommonParserOptions.ShouldEraseEvent(parseSuccess, sourceEvent, metadata)) {
ADD_COUNTER(mDiscardedEventsTotal, 1);
return false;
}
return true;
}