confgenerator/logging_processors.go (418 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package confgenerator
import (
"context"
"fmt"
"sort"
"strings"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/filter"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit/modify"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel/ottl"
)
// TODO: Add a validation check that will allow only one unique language exceptions that focus in one specific language.
type ParseMultilineGroup struct {
Type string `yaml:"type" validate:"required,oneof=language_exceptions"`
Language string `yaml:"language" validate:"required,oneof=java python go"`
}
type ParseMultiline struct {
ConfigComponent `yaml:",inline"`
// Make this a list so that it's forward compatible to support more `parse_multiline` type other than the build-in language exceptions.
MultilineGroups []*ParseMultilineGroup `yaml:"match_any" validate:"required,min=1,max=3,unique"`
}
func (r ParseMultiline) Type() string {
return "parse_multiline"
}
var multilineRulesLanguageMap = map[string][]string{
// Below is the working java rules provided by fluentbit team: https://github.com/fluent/fluent-bit/issues/4611
// Move to built-in java support, when upstream fixes the issue
"java": {`"start_state, java_start_exception" "/(?:Exception|Error|Throwable|V8 errors stack trace)[:\r\n]/" "java_after_exception"`,
`"java_nested_exception" "/(?:Exception|Error|Throwable|V8 errors stack trace)[:\r\n]/" "java_after_exception"`,
`"java_after_exception" "/^[\t ]*nested exception is:[\\t ]*/" "java_nested_exception"`,
`"java_after_exception" "/^[\r\n]*$/" "java_after_exception"`,
`"java_after_exception" "/^[\t ]+(?:eval )?at /" "java_after_exception"`,
`"java_after_exception" "/^[\t ]+--- End of inner exception stack trace ---$/" "java_after_exception"`,
`"java_after_exception" "/^--- End of stack trace from previous (?x:)location where exception was thrown ---$/" "java_after_exception"`,
`"java_after_exception" "/^[\t ]*(?:Caused by|Suppressed):/" "java_after_exception"`,
`"java_after_exception" "/^[\t ]*... \d+ (?:more|common frames omitted)/" "java_after_exception"`},
"python": {`"start_state, python_start_exception" "/Traceback \(most recent call last\):$/" "python"`,
`"python" "/^[\t ]+File /" "python_code"`,
`"python_code" "/[^\t ]/" "python"`,
`"python" "/^(?:[^\s.():]+\.)*[^\s.():]+:/" "python_start_exception"`},
"go": {`"start_state" "/\bpanic: /" "go_after_panic"`,
`"start_state" "/http: panic serving/" "go_goroutine"`,
`"go_after_panic" "/^$/" "go_goroutine"`,
`"go_after_panic, go_after_signal, go_frame_1" "/^$/" "go_goroutine"`,
`"go_after_panic" "/^\[signal /" "go_after_signal"`,
`"go_goroutine" "/^goroutine \d+ \[[^\]]+\]:$/" "go_frame_1"`,
`"go_frame_1" "/^(?:[^\s.:]+\.)*[^\s.():]+\(|^created by /" "go_frame_2"`,
`"go_frame_2" "/^\s/" "go_frame_1"`},
}
func (p ParseMultiline) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
var components []fluentbit.Component
// Fluent Bit multiline parser currently can't export using `message` as key.
// Thus we need to add one renaming component per pipeline
// Remove below two lines when https://github.com/fluent/fluent-bit/issues/4795 is fixed
renameLogToMessage := modify.NewRenameOptions("log", "message")
components = append(components, renameLogToMessage.Component(tag))
var combinedRules []string
for _, g := range p.MultilineGroups {
if g.Type == "language_exceptions" {
combinedRules = append(combinedRules, multilineRulesLanguageMap[g.Language]...)
}
}
component := fluentbit.ParseMultilineComponent(tag, uid, combinedRules)
components = append(components, component...)
return components
}
func init() {
LoggingProcessorTypes.RegisterType(func() LoggingProcessor { return &ParseMultiline{} })
}
// ParserShared holds common parameters that are used by all processors that are implemented with fluentbit's "parser" filter.
type ParserShared struct {
TimeKey string `yaml:"time_key,omitempty" validate:"required_with=TimeFormat,omitempty,fieldlegacy"` // by default does not parse timestamp
TimeFormat string `yaml:"time_format,omitempty" validate:"required_with=TimeKey"` // must be provided if time_key is present
// Types allows parsing the extracted fields.
// Not exposed to users for now, but can be used by app receivers.
// Documented at https://docs.fluentbit.io/manual/v/1.3/parser
// According to docs, this is only supported with `ltsv`, `logfmt`, and `regex` parsers.
Types map[string]string `yaml:"-" validate:"dive,oneof=string integer bool float hex"`
}
func (p ParserShared) Component(tag, uid string) (fluentbit.Component, string) {
return fluentbit.ParserComponentBase(p.TimeFormat, p.TimeKey, p.Types, tag, uid)
}
func (p ParserShared) TimestampStatements() (ottl.Statements, error) {
if p.TimeKey == "" {
return nil, nil
}
from, err := filter.NewMemberLegacy(p.TimeKey)
if err != nil {
return nil, err
}
fromAccessor, err := from.OTTLAccessor()
if err != nil {
return nil, err
}
flag := ottl.LValue{"cache", "__time_valid"}
// Replicate fluent-bit behavior of preserving the existing field if the time is unparsable.
// The result of `ToTime` cannot be stored in `cache`, so instead we store a boolean flag.
return ottl.NewStatements(
flag.Set(ottl.False()),
flag.SetIf(ottl.True(), ottl.And(
fromAccessor.IsPresent(),
ottl.IsNotNil(ottl.ToTime(fromAccessor, p.TimeFormat)),
)),
ottl.LValue{"time"}.SetIf(ottl.ToTime(fromAccessor, p.TimeFormat), ottl.Equals(flag, ottl.True())),
fromAccessor.DeleteIf(ottl.Equals(flag, ottl.True())),
), nil
}
func (p ParserShared) TypesStatements() (ottl.Statements, error) {
var out ottl.Statements
for field, fieldType := range p.Types {
m, err := filter.NewMemberLegacy(field)
if err != nil {
return nil, err
}
a, err := m.OTTLAccessor()
if err != nil {
return nil, err
}
// See OTTL docs at https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/ottlfuncs
switch fieldType {
case "string":
out = out.Append(a.Set(ottl.ToString(a)))
case "integer":
out = out.Append(a.Set(ottl.ToInt(a)))
case "bool":
out = out.Append(a.SetToBool(a))
case "float":
out = out.Append(a.Set(ottl.ToFloat(a)))
case "hex":
// TODO: Not exposed in OTTL
fallthrough
default:
return nil, fmt.Errorf("type %q not supported for field %s", fieldType, m)
}
}
return out, nil
}
// Handle special fields documented at https://cloud.google.com/stackdriver/docs/solutions/agents/ops-agent/configuration#special-fields
func (p ParserShared) FluentBitSpecialFieldsStatements(ctx context.Context) ottl.Statements {
fields := filter.FluentBitSpecialFields()
var names []string
for f := range fields {
if fields[f] == "labels" {
continue
}
names = append(names, f)
}
sort.Strings(names)
labels := ottl.LValue{"body", "logging.googleapis.com/labels"}
statements := ottl.NewStatements(
// Do labels first so other fields can override it.
ottl.LValue{"attributes"}.MergeMaps(labels, "upsert"),
labels.Delete(),
)
for _, f := range names {
s, err := LoggingProcessorModifyFields{Fields: map[string]*ModifyField{
fields[f]: &ModifyField{
MoveFrom: fmt.Sprintf(`jsonPayload.%q`, f),
},
}}.statements(ctx)
if err != nil {
// Should be impossible
panic(err)
}
statements = statements.Append(s)
}
return statements
}
// A LoggingProcessorParseJson parses the specified field as JSON.
type LoggingProcessorParseJson struct {
ConfigComponent `yaml:",inline"`
ParserShared `yaml:",inline"`
Field string `yaml:"field,omitempty" validate:"omitempty,fieldlegacy"`
}
func (r LoggingProcessorParseJson) Type() string {
return "parse_json"
}
func (p LoggingProcessorParseJson) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
parser, parserName := p.ParserShared.Component(tag, uid)
parser.Config["Format"] = "json"
parserFilters := []fluentbit.Component{}
parserFilters = append(parserFilters, fluentbit.ParserFilterComponents(tag, p.Field, []string{parserName}, false)...)
parserFilters = append(parserFilters, parser)
return parserFilters
}
func (p LoggingProcessorParseJson) Processors(ctx context.Context) ([]otel.Component, error) {
from := p.Field
if from == "" {
from = "jsonPayload.message"
}
m, err := filter.NewMemberLegacy(from)
if err != nil {
return nil, err
}
fromAccessor, err := m.OTTLAccessor()
if err != nil {
return nil, err
}
cachedJSON := ottl.LValue{"cache", "__parsed_json"}
statements := ottl.NewStatements(
cachedJSON.SetIf(ottl.ParseJSON(fromAccessor), fromAccessor.IsPresent()),
fromAccessor.DeleteIf(cachedJSON.IsPresent()),
ottl.LValue{"body"}.MergeMapsIf(cachedJSON, "upsert", cachedJSON.IsPresent()),
cachedJSON.Delete(),
)
ts, err := p.TimestampStatements()
if err != nil {
return nil, err
}
statements = statements.Append(ts)
ts, err = p.TypesStatements()
if err != nil {
return nil, err
}
statements = statements.Append(ts)
statements = statements.Append(p.FluentBitSpecialFieldsStatements(ctx))
return []otel.Component{otel.Transform(
"log", "log",
statements,
)}, nil
}
func init() {
LoggingProcessorTypes.RegisterType(func() LoggingProcessor { return &LoggingProcessorParseJson{} })
}
// A LoggingProcessorParseRegex applies a regex to the specified field, storing the named capture groups as keys in the log record.
// This was maintained in addition to the parse_regex_complex to ensure backward compatibility with any existing configurations
type LoggingProcessorParseRegex struct {
ConfigComponent `yaml:",inline"`
ParserShared `yaml:",inline"`
Field string `yaml:"field,omitempty" validate:"omitempty,fieldlegacy"`
PreserveKey bool `yaml:"-"`
Regex string `yaml:"regex,omitempty" validate:"required"`
}
func (r LoggingProcessorParseRegex) Type() string {
return "parse_regex"
}
func (p LoggingProcessorParseRegex) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
parser, parserName := p.ParserShared.Component(tag, uid)
parser.Config["Format"] = "regex"
parser.Config["Regex"] = p.Regex
parserFilters := []fluentbit.Component{}
parserFilters = append(parserFilters, parser)
parserFilters = append(parserFilters, fluentbit.ParserFilterComponents(tag, p.Field, []string{parserName}, p.PreserveKey)...)
return parserFilters
}
type RegexParser struct {
Regex string
Parser ParserShared
}
// A LoggingProcessorParseRegexComplex applies a set of regexes to the specified field, storing the named capture groups as keys in the log record.
type LoggingProcessorParseRegexComplex struct {
Field string
Parsers []RegexParser
}
func (p LoggingProcessorParseRegexComplex) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
components := []fluentbit.Component{}
parserNames := []string{}
for idx, parserConfig := range p.Parsers {
parser, parserName := parserConfig.Parser.Component(tag, fmt.Sprintf("%s.%d", uid, idx))
parser.Config["Format"] = "regex"
parser.Config["Regex"] = parserConfig.Regex
components = append(components, parser)
parserNames = append(parserNames, parserName)
}
components = append(components, fluentbit.ParserFilterComponents(tag, p.Field, parserNames, false)...)
return components
}
type MultilineRule struct {
StateName string
Regex string
NextState string
}
func (r MultilineRule) AsString() string {
escapedRegex := strings.ReplaceAll(r.Regex, `"`, `\"`)
return fmt.Sprintf(`"%s" "%s" "%s"`, r.StateName, escapedRegex, r.NextState)
}
// A LoggingProcessorParseMultiline applies a set of regex rules to the specified lines, storing the named capture groups as keys in the log record.
//
// #
// # Regex rules for multiline parsing
// # ---------------------------------
// #
// # configuration hints:
// #
// # - first state always has the name: start_state
// # - every field in the rule must be inside double quotes
// #
// # rules | state name | regex pattern | next state
// # ------|---------------|--------------------------------------------
// rule "start_state" "/(Dec \d+ \d+\:\d+\:\d+)(.*)/" "cont"
// rule "cont" "/^\s+at.*/" "cont"
type LoggingProcessorParseMultilineRegex struct {
LoggingProcessorParseRegexComplex
Rules []MultilineRule
}
func (p LoggingProcessorParseMultilineRegex) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
multilineParserName := fmt.Sprintf("%s.%s.multiline", tag, uid)
rules := [][2]string{}
for _, rule := range p.Rules {
rules = append(rules, [2]string{"rule", rule.AsString()})
}
filter := fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "multiline",
"Match": tag,
"Multiline.Key_Content": "message",
"Multiline.Parser": multilineParserName,
},
}
if p.Field != "" {
filter.Config["Multiline.Key_Content"] = p.Field
}
multilineParser := fluentbit.Component{
Kind: "MULTILINE_PARSER",
Config: map[string]string{
"Name": multilineParserName,
"Type": "regex",
},
OrderedConfig: rules,
}
return append([]fluentbit.Component{filter, multilineParser}, p.LoggingProcessorParseRegexComplex.Components(ctx, tag, uid)...)
}
func init() {
LoggingProcessorTypes.RegisterType(func() LoggingProcessor { return &LoggingProcessorParseRegex{} })
}
type LoggingProcessorNestWildcard struct {
Wildcard string
NestUnder string
RemovePrefix string
}
func (p LoggingProcessorNestWildcard) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
filter := fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "nest",
"Match": tag,
"Operation": "nest",
"Wildcard": p.Wildcard,
"Nest_under": p.NestUnder,
"Remove_prefix": p.RemovePrefix,
},
}
return []fluentbit.Component{
filter,
}
}
var LegacyBuiltinProcessors = map[string]LoggingProcessor{
"lib:default_message_parser": &LoggingProcessorParseRegex{
Regex: `^(?<message>.*)$`,
},
"lib:apache": &LoggingProcessorParseRegex{
Regex: `^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$`,
ParserShared: ParserShared{
TimeKey: "time",
TimeFormat: "%d/%b/%Y:%H:%M:%S %z",
},
},
"lib:apache2": &LoggingProcessorParseRegex{
Regex: `^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>.*)")?$`,
ParserShared: ParserShared{
TimeKey: "time",
TimeFormat: "%d/%b/%Y:%H:%M:%S %z",
},
},
"lib:apache_error": &LoggingProcessorParseRegex{
Regex: `^\[[^ ]* (?<time>[^\]]*)\] \[(?<level>[^\]]*)\](?: \[pid (?<pid>[^\]]*)\])?( \[client (?<client>[^\]]*)\])? (?<message>.*)$`,
},
"lib:mongodb": &LoggingProcessorParseRegex{
Regex: `^(?<time>[^ ]*)\s+(?<severity>\w)\s+(?<component>[^ ]+)\s+\[(?<context>[^\]]+)]\s+(?<message>.*?) *(?<ms>(\d+))?(:?ms)?$`,
ParserShared: ParserShared{
TimeKey: "time",
TimeFormat: "%Y-%m-%dT%H:%M:%S.%L",
},
},
"lib:nginx": &LoggingProcessorParseRegex{
Regex: `^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")`,
ParserShared: ParserShared{
TimeKey: "time",
TimeFormat: "%d/%b/%Y:%H:%M:%S %z",
},
},
"lib:syslog-rfc5424": &LoggingProcessorParseRegex{
Regex: `^\<(?<pri>[0-9]{1,5})\>1 (?<time>[^ ]+) (?<host>[^ ]+) (?<ident>[^ ]+) (?<pid>[-0-9]+) (?<msgid>[^ ]+) (?<extradata>(\[(.*?)\]|-)) (?<message>.+)$`,
ParserShared: ParserShared{
TimeKey: "time",
TimeFormat: "%Y-%m-%dT%H:%M:%S.%L%Z",
},
},
"lib:syslog-rfc3164": &LoggingProcessorParseRegex{
Regex: `/^\<(?<pri>[0-9]+)\>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/`,
ParserShared: ParserShared{
TimeKey: "time",
TimeFormat: "%b %d %H:%M:%S",
},
},
}
// A LoggingProcessorExcludeLogs filters out logs according to a pattern.
type LoggingProcessorExcludeLogs struct {
ConfigComponent `yaml:",inline"`
MatchAny []string `yaml:"match_any" validate:"required,dive,filter"`
}
func (p LoggingProcessorExcludeLogs) Type() string {
return "exclude_logs"
}
func (p LoggingProcessorExcludeLogs) filters() ([]*filter.Filter, error) {
filters := make([]*filter.Filter, 0, len(p.MatchAny))
for _, condition := range p.MatchAny {
filter, err := filter.NewFilter(condition)
if err != nil {
return nil, fmt.Errorf("error parsing condition '%s': %v", condition, err)
}
filters = append(filters, filter)
}
return filters, nil
}
func (p LoggingProcessorExcludeLogs) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
filters, err := p.filters()
if err != nil {
panic(err)
}
components, lua := filter.AllFluentConfig(tag, map[string]*filter.Filter{
"match": filter.MatchesAny(filters),
})
components = append(components, fluentbit.LuaFilterComponents(
tag, "process", fmt.Sprintf(`
function process(tag, timestamp, record)
%s
if match then
return -1, 0, 0
end
return 2, 0, record
end`, lua))...)
return components
}
func (p LoggingProcessorExcludeLogs) Processors(ctx context.Context) ([]otel.Component, error) {
filters, err := p.filters()
if err != nil {
return nil, err
}
var expressions []ottl.Value
for _, f := range filters {
expr, err := f.OTTLExpression()
if err != nil {
return nil, fmt.Errorf("failed to process condition %q: %w", f, err)
}
expressions = append(expressions, expr)
}
return []otel.Component{otel.Filter(
"logs", "log_record",
expressions,
)}, nil
}
func init() {
LoggingProcessorTypes.RegisterType(func() LoggingProcessor { return &LoggingProcessorExcludeLogs{} })
}