confgenerator/logging_modify_fields.go (339 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package confgenerator import ( "context" "fmt" "slices" "sort" "strings" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/filter" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel/ottl" ) type ModifyField struct { // Source of value for this field MoveFrom string `yaml:"move_from" validate:"omitempty,field,excluded_with=CopyFrom StaticValue"` CopyFrom string `yaml:"copy_from" validate:"omitempty,field,excluded_with=MoveFrom StaticValue"` StaticValue *string `yaml:"static_value" validate:"excluded_with=MoveFrom CopyFrom DefaultValue"` DefaultValue *string `yaml:"default_value" validate:"excluded_with=StaticValue"` // OTTL expression with copied value sourceValue ottl.Value `yaml:"-"` // Name of Lua variable with copied value sourceVar string `yaml:"-"` // Name of Lua variable with omit boolean omitVar string `yaml:"-"` // Operations to perform Type string `yaml:"type" validate:"omitempty,oneof=integer float"` CustomConvertFunc func(ottl.LValue) ottl.Statements `yaml:"-"` OmitIf string `yaml:"omit_if" validate:"omitempty,filter"` MapValues map[string]string `yaml:"map_values"` // In case the source field's value does not match any keys specified in the map_values pairs, // the destination field will be forcefully unset if map_values_exclusive is true, // or left untouched if map_values_exclusive is false. MapValuesExclusive bool `yaml:"map_values_exclusive" validate:"excluded_without=MapValues"` } type LoggingProcessorModifyFields struct { ConfigComponent `yaml:",inline"` Fields map[string]*ModifyField `yaml:"fields" validate:"dive,keys,field,distinctfield,writablefield,endkeys" tracking:"-"` // For use by other processors, if set this will clear out `jsonPayload`, leaving only the fields set above. // Only supported in OTel. EmptyBody bool `yaml:"-" tracking:"-"` } func (p LoggingProcessorModifyFields) Type() string { return "modify_fields" } func (p LoggingProcessorModifyFields) Components(ctx context.Context, tag, uid string) []fluentbit.Component { c, err := p.components(tag, uid) if err != nil { // It shouldn't be possible to get here if the input validation is working, so treat this as a code bug. panic(err) } return c } func (p LoggingProcessorModifyFields) components(tag, uid string) ([]fluentbit.Component, error) { var lua strings.Builder lua.WriteString(` function process(tag, timestamp, record) `) var components []fluentbit.Component // Step 1: Obtain any source values needed for move or copy fieldMappings := map[string]string{} moveFromFields := []string{} var dests []string for dest, field := range p.Fields { if field == nil { // Nothing to do for this field continue } dests = append(dests, dest) } sort.Strings(dests) omitFilters := map[string]*filter.Filter{} for i, dest := range dests { field := p.Fields[dest] if field.MoveFrom == "" && field.CopyFrom == "" && field.StaticValue == nil { // Default to modifying field in place field.CopyFrom = dest } for j, name := range []*string{&field.MoveFrom, &field.CopyFrom} { if *name == "" { continue } m, err := filter.NewMember(*name) if err != nil { return nil, fmt.Errorf("failed to parse field %q: %w", *name, err) } key, err := m.LuaAccessor(false) if err != nil { return nil, fmt.Errorf("failed to convert field %q to Lua accessor: %w", *name, err) } if _, ok := fieldMappings[key]; !ok { new := fmt.Sprintf("__field_%d", i) fieldMappings[key] = new fmt.Fprintf(&lua, "local %s = %s;\n", new, key) } field.sourceVar = fieldMappings[key] if j == 0 { ra, err := m.LuaAccessor(true) if err != nil { return nil, fmt.Errorf("failed to convert %v to Lua accessor: %w", m, err) } moveFromFields = append(moveFromFields, ra) } } if field.OmitIf != "" { f, err := filter.NewFilter(field.OmitIf) if err != nil { return nil, fmt.Errorf("failed to parse filter %q: %w", field.OmitIf, err) } field.omitVar = fmt.Sprintf("omit%d", i) omitFilters[field.omitVar] = f } } // Step 2: OmitIf conditions if len(omitFilters) > 0 { fcomponents, flua := filter.AllFluentConfig(tag, omitFilters) components = append(components, fcomponents...) lua.WriteString(flua) } // Step 3: Remove any MoveFrom fields sort.Strings(moveFromFields) last := "" for _, ra := range moveFromFields { if last != ra { fmt.Fprintf(&lua, `%s(nil); `, ra) } last = ra } // Step 4: Assign values for _, dest := range dests { field := p.Fields[dest] outM, err := filter.NewMember(dest) if err != nil { return nil, fmt.Errorf("failed to parse output field %q: %w", dest, err) } src := "nil" if field.sourceVar != "" { src = field.sourceVar } if field.StaticValue != nil { src = filter.LuaQuote(*field.StaticValue) } fmt.Fprintf(&lua, "local v = %s;\n", src) if field.DefaultValue != nil { fmt.Fprintf(&lua, "if v == nil then v = %s end;\n", filter.LuaQuote(*field.DefaultValue)) } // Process MapValues var keys []string for k := range field.MapValues { keys = append(keys, k) } sort.Strings(keys) for i, k := range keys { if i > 0 { lua.WriteString("else") } fmt.Fprintf(&lua, "if v == %s then v = %s\n", filter.LuaQuote(k), filter.LuaQuote(field.MapValues[k])) } if len(keys) > 0 { if field.MapValuesExclusive { lua.WriteString("else v = nil\n") } lua.WriteString("end\n") } // Process Type var conv string switch field.Type { case "integer": // Fluent-bit currently targets Lua 5.1, which uses the same type for numbers and integers. // When converting back to msgpack, if a number can be represented as an integer, fluent-bit does so, otherwise it uses a float. // If fluent-bit ever supports Lua 5.3, we can switch this to math.tointeger and use proper integers. conv = "math.floor(tonumber(v))" case "float": conv = "tonumber(v)" case "YesNoBoolean": // Used by the mysql logging receiver; not allowed in user config by validation. // First we check if v is truthy according to Lua (i.e. not nil). // The "and" operator returns the first argument's value if it is false (so nil), // so this expression produces true, false, or nil depending on the input. conv = `(v and v == "Yes")` } if conv != "" { // Leave existing string value if not convertible fmt.Fprintf(&lua, ` local v2 = %s if v2 ~= fail then v = v2 end `, conv) } // Omit if if field.omitVar != "" { fmt.Fprintf(&lua, "if %s then v = nil end;\n", field.omitVar) } ra, err := outM.LuaAccessor(true) if err != nil { return nil, fmt.Errorf("failed to convert %v to Lua accessor: %w", outM, err) } fmt.Fprintf(&lua, "%s(v)\n", ra) } lua.WriteString("return 2, timestamp, record\n") lua.WriteString("end\n") // Execute Lua code components = append(components, fluentbit.LuaFilterComponents(tag, "process", lua.String())...) return components, nil } func (p LoggingProcessorModifyFields) Processors(ctx context.Context) ([]otel.Component, error) { out, err := p.statements(ctx) if err != nil { return nil, err } return []otel.Component{otel.Transform( "log", "log", out, )}, nil } func (p LoggingProcessorModifyFields) statements(_ context.Context) (ottl.Statements, error) { var statements ottl.Statements var dests []string for dest, field := range p.Fields { if field == nil { // Nothing to do for this field continue } dests = append(dests, dest) } sort.Strings(dests) // map of (dest field as OTTL expression) to (source field as OTTL expression) fieldMappings := map[string]ottl.LValue{} // slice of OTTL fields to delete var moveFromFields []ottl.LValue // map of (variable name) to (filter object) var omitFilters []*filter.Filter for i, dest := range dests { field := p.Fields[dest] if field.MoveFrom == "" && field.CopyFrom == "" && field.StaticValue == nil { // Default to modifying field in place field.CopyFrom = dest } for j, name := range []*string{&field.MoveFrom, &field.CopyFrom} { if *name == "" { continue } m, err := filter.NewMember(*name) if err != nil { return nil, fmt.Errorf("failed to parse field %q: %w", *name, err) } accessor, err := m.OTTLAccessor() if err != nil { return nil, fmt.Errorf("failed to convert field %q to OTTL: %w", *name, err) } key := accessor.String() if _, ok := fieldMappings[key]; !ok { new := ottl.LValue{ "cache", fmt.Sprintf("__field_%d", i), } fieldMappings[key] = new statements = statements.Append( new.Delete(), new.SetIf(accessor, accessor.IsPresent()), ) } field.sourceValue = fieldMappings[key] if j == 0 { // MoveFrom moveFromFields = append(moveFromFields, accessor) } } if field.OmitIf != "" { f, err := filter.NewFilter(field.OmitIf) if err != nil { return nil, fmt.Errorf("failed to parse filter %q: %w", field.OmitIf, err) } field.omitVar = fmt.Sprintf("__omit_%d", len(omitFilters)) omitFilters = append(omitFilters, f) } } // Step 2: OmitIf conditions for i, f := range omitFilters { name := fmt.Sprintf("__omit_%d", i) expr, err := f.OTTLExpression() if err != nil { return nil, fmt.Errorf("failed to parse omit_if condition %q: %w", f, err) } statements = statements.Append( ottl.LValue{"cache", name}.Set(ottl.False()), ottl.LValue{"cache", name}.SetIf(ottl.True(), expr), ) } // Step 3: Remove any MoveFrom fields // Sort first to make the resulting configs deterministic sort.Slice(moveFromFields, func(i, j int) bool { return moveFromFields[i].String() < moveFromFields[j].String() }) var last ottl.LValue for _, v := range moveFromFields { if !slices.Equal(last, v) { statements = statements.Append(v.Delete()) } last = v } if p.EmptyBody { // With no keys, will clear out the body, leaving a copy at cache.body. statements = statements.Append( ottl.LValue{"cache", "body"}.Set(ottl.LValue{"body"}), ottl.LValue{"body"}.KeepKeys(), ) } // Step 4: Assign values for _, dest := range dests { field := p.Fields[dest] outM, err := filter.NewMember(dest) if err != nil { return nil, fmt.Errorf("failed to parse output field %q: %w", dest, err) } src := ottl.Nil() if field.sourceValue != nil { src = field.sourceValue } if field.StaticValue != nil { src = ottl.StringLiteral(*field.StaticValue) } value := ottl.LValue{"cache", "value"} statements = statements.Append( // Set silently fails to set if the value is nil, so we delete first. value.Delete(), value.Set(src), ) if field.DefaultValue != nil { statements = statements.Append( value.SetIfNil(ottl.StringLiteral(*field.DefaultValue)), ) } // Process MapValues if len(field.MapValues) > 0 { mapped_value := ottl.LValue{"cache", "mapped_value"} statements = statements.Append( mapped_value.Delete(), ) if !field.MapValuesExclusive { statements = statements.Append( mapped_value.SetIf(value, value.IsPresent()), ) } var keys []string for k := range field.MapValues { keys = append(keys, k) } sort.Strings(keys) for _, k := range keys { statements = statements.Append( mapped_value.SetIf(ottl.StringLiteral(field.MapValues[k]), ottl.Equals(value, ottl.StringLiteral(k))), ) } value = mapped_value } switch field.Type { case "integer": statements = statements.Append(value.Set(ottl.ToInt(value))) case "float": statements = statements.Append(value.Set(ottl.ToFloat(value))) case "YesNoBoolean": // TODO return nil, fmt.Errorf("YesNoBoolean unsupported") } if field.CustomConvertFunc != nil { statements = statements.Append(field.CustomConvertFunc(value)) } ra, err := outM.OTTLAccessor() if err != nil { return nil, fmt.Errorf("failed to convert %v to OTTL accessor: %w", outM, err) } statements = statements.Append(ra.SetIf(value, value.IsPresent())) if field.omitVar != "" { statements = statements.Append(ra.DeleteIf(ottl.Equals(ottl.LValue{"cache", field.omitVar}, ottl.True()))) } } return statements, nil } func init() { LoggingProcessorTypes.RegisterType(func() LoggingProcessor { return &LoggingProcessorModifyFields{} }) }