bool ProcessorParseDelimiterNative::ProcessEvent()

in core/plugin/processor/ProcessorParseDelimiterNative.cpp [206:364]


bool ProcessorParseDelimiterNative::ProcessEvent(const StringView& logPath,
                                                 PipelineEventPtr& e,
                                                 const GroupMetadata& metadata) {
    if (!IsSupportedEvent(e)) {
        ADD_COUNTER(mOutFailedEventsTotal, 1);
        return true;
    }
    LogEvent& sourceEvent = e.Cast<LogEvent>();
    if (!sourceEvent.HasContent(mSourceKey)) {
        ADD_COUNTER(mOutKeyNotFoundEventsTotal, 1);
        return true;
    }
    StringView buffer = sourceEvent.GetContent(mSourceKey);

    int32_t endIdx = buffer.size();
    if (endIdx == 0) {
        ADD_COUNTER(mOutFailedEventsTotal, 1);
        return true;
    }

    for (int32_t i = endIdx - 1; i >= 0; --i) {
        if (buffer.data()[i] == ' ' || '\r' == buffer.data()[i])
            endIdx = i;
        else
            break;
    }
    int32_t begIdx = 0;
    for (int32_t i = 0; i < endIdx; ++i) {
        if (buffer.data()[i] == ' ')
            begIdx = i + 1;
        else
            break;
    }
    if (begIdx >= endIdx) {
        ADD_COUNTER(mOutFailedEventsTotal, 1);
        return true;
    }

    size_t reserveSize
        = mOverflowedFieldsTreatment == OverflowedFieldsTreatment::EXTEND ? (mKeys.size() + 10) : (mKeys.size() + 1);
    std::vector<StringView> columnValues;
    std::vector<size_t> colBegIdxs;
    std::vector<size_t> colLens;
    bool parseSuccess = false;
    size_t parsedColCount = 0;
    bool useQuote = (mSeparator.size() == 1) && (mQuote != mSeparatorChar);
    if (mKeys.size() > 0) {
        if (useQuote) {
            columnValues.reserve(reserveSize);
            parseSuccess
                = mDelimiterModeFsmParserPtr->ParseDelimiterLine(buffer, begIdx, endIdx, columnValues, sourceEvent);
            // handle auto extend
            if (!(mOverflowedFieldsTreatment == OverflowedFieldsTreatment::EXTEND)
                && columnValues.size() > mKeys.size()) {
                int requiredLen = 0;
                for (size_t i = mKeys.size(); i < columnValues.size(); ++i) {
                    requiredLen += 1 + columnValues[i].size();
                }
                StringBuffer sb = sourceEvent.GetSourceBuffer()->AllocateStringBuffer(requiredLen);
                char* extraFields = sb.data;
                for (size_t i = mKeys.size(); i < columnValues.size(); ++i) {
                    extraFields[0] = mSeparatorChar;
                    extraFields++;
                    memcpy(extraFields, columnValues[i].data(), columnValues[i].size());
                    extraFields += columnValues[i].size();
                }
                // remove extra fields
                columnValues.resize(mKeys.size());
                columnValues.push_back(StringView(sb.data, requiredLen));
            }
            parsedColCount = columnValues.size();
        } else {
            colBegIdxs.reserve(reserveSize);
            colLens.reserve(reserveSize);
            parseSuccess = SplitString(buffer.data(), begIdx, endIdx, colBegIdxs, colLens);
            parsedColCount = colBegIdxs.size();
        }

        if (parseSuccess) {
            if (parsedColCount <= 0 || (!mAllowingShortenedFields && parsedColCount < mKeys.size())) {
                LOG_WARNING(
                    sLogger,
                    ("parse delimiter log fail, keys count unmatch "
                     "columns count, parsed",
                     parsedColCount)("required", mKeys.size())("log", buffer)("project", GetContext().GetProjectName())(
                        "logstore", GetContext().GetLogstoreName())("file", logPath));
                GetContext().GetAlarm().SendAlarm(PARSE_LOG_FAIL_ALARM,
                                                  std::string("keys count unmatch columns count :")
                                                      + ToString(parsedColCount) + ", required:"
                                                      + ToString(mKeys.size()) + ", logs:" + buffer.to_string(),
                                                  GetContext().GetRegion(),
                                                  GetContext().GetProjectName(),
                                                  GetContext().GetConfigName(),
                                                  GetContext().GetLogstoreName());
                parseSuccess = false;
            }
        } else {
            AlarmManager::GetInstance()->SendAlarm(PARSE_LOG_FAIL_ALARM,
                                                   std::string("parse delimiter log fail")
                                                       + ", logs:" + buffer.to_string(),
                                                   GetContext().GetRegion(),
                                                   GetContext().GetProjectName(),
                                                   GetContext().GetConfigName(),
                                                   GetContext().GetLogstoreName());
            parseSuccess = false;
        }
    } else {
        AlarmManager::GetInstance()->SendAlarm(PARSE_LOG_FAIL_ALARM,
                                               "no column keys defined",
                                               GetContext().GetRegion(),
                                               GetContext().GetProjectName(),
                                               GetContext().GetConfigName(),
                                               GetContext().GetLogstoreName());
        LOG_WARNING(sLogger,
                    ("parse delimiter log fail", "no column keys defined")("project", GetContext().GetProjectName())(
                        "logstore", GetContext().GetLogstoreName())("file", logPath));
        parseSuccess = false;
    }

    if (parseSuccess) {
        for (uint32_t idx = 0; idx < parsedColCount; idx++) {
            if (mKeys.size() > idx) {
                if (mExtractingPartialFields && mKeys[idx] == s_mDiscardedFieldKey) {
                    continue;
                }
                AddLog(mKeys[idx],
                       useQuote ? columnValues[idx] : StringView(buffer.data() + colBegIdxs[idx], colLens[idx]),
                       sourceEvent);
            } else {
                if (mExtractingPartialFields) {
                    continue;
                }
                std::string key = "__column" + ToString(idx) + "__";
                StringBuffer sb = sourceEvent.GetSourceBuffer()->CopyString(key);
                AddLog(StringView(sb.data, sb.size),
                       useQuote ? columnValues[idx] : StringView(buffer.data() + colBegIdxs[idx], colLens[idx]),
                       sourceEvent);
            }
        }
        ADD_COUNTER(mOutSuccessfulEventsTotal, 1);
    } else {
        ADD_COUNTER(mOutFailedEventsTotal, 1);
    }

    if (!parseSuccess || !mSourceKeyOverwritten) {
        sourceEvent.DelContent(mSourceKey);
    }
    if (mCommonParserOptions.ShouldAddSourceContent(parseSuccess)) {
        AddLog(mCommonParserOptions.mRenamedSourceKey, buffer, sourceEvent, false);
    }
    if (mCommonParserOptions.ShouldAddLegacyUnmatchedRawLog(parseSuccess)) {
        AddLog(mCommonParserOptions.legacyUnmatchedRawLogKey, buffer, sourceEvent, false);
    }
    if (mCommonParserOptions.ShouldEraseEvent(parseSuccess, sourceEvent, metadata)) {
        ADD_COUNTER(mDiscardedEventsTotal, 1);
        return false;
    }
    return true;
}