confgenerator/fluentbit/processors.go (118 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package fluentbit provides data structures to represent and generate fluentBit configuration.
package fluentbit
import (
"crypto/md5"
"encoding/hex"
"fmt"
"sort"
"strings"
)
// ParseMultilineComponent constitutes the mulltiline_parser components.
func ParseMultilineComponent(tag string, uid string, languageRules []string) []Component {
var components []Component
multilineParserName := fmt.Sprintf("multiline.%s.%s", tag, uid)
rules := [][2]string{}
for _, rule := range languageRules {
rules = append(rules, [2]string{"rule", rule})
}
multilineParser := Component{
Kind: "MULTILINE_PARSER",
Config: map[string]string{
"name": multilineParserName,
"type": "regex",
"flush_timeout": "1000",
},
OrderedConfig: rules,
}
components = append(components, multilineParser)
return components
}
// TranslationComponents translates SrcVal on key src to DestVal on key dest, if the dest key does not exist.
// If removeSrc is true, the original key is removed when translated.
func TranslationComponents(tag, src, dest string, removeSrc bool, translations []struct{ SrcVal, DestVal string }) []Component {
c := []Component{}
for _, t := range translations {
comp := Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "modify",
"Match": tag,
"Condition": fmt.Sprintf("Key_Value_Equals %s %s", src, t.SrcVal),
"Add": fmt.Sprintf("%s %s", dest, t.DestVal),
},
}
if removeSrc {
comp.Config["Remove"] = src
}
c = append(c, comp)
}
return c
}
// LuaFilterComponents returns components that execute the Lua script given in src on records that match tag.
// TODO(ridwanmsharif): Replace this with in-config script when
//
// fluent/fluent-bit#4634 is supported.
func LuaFilterComponents(tag, function, src string) []Component {
hasher := md5.New()
hasher.Write([]byte(src))
hash := hex.EncodeToString(hasher.Sum(nil))
filename := fmt.Sprintf("%s.lua", hash)
return []Component{
{
Kind: "FILTER",
Config: map[string]string{
"Name": "lua",
"Match": tag,
"script": filename,
"call": function,
},
},
outputFileComponent(filename, src),
}
}
// The parser component is incomplete and needs (at a minimum) the "Format" key to be set.
func ParserComponentBase(TimeFormat string, TimeKey string, Types map[string]string, tag string, uid string) (Component, string) {
parserName := fmt.Sprintf("%s.%s", tag, uid)
parser := Component{
Kind: "PARSER",
Config: map[string]string{
"Name": parserName,
},
}
if TimeFormat != "" {
parser.Config["Time_Format"] = TimeFormat
}
if TimeKey != "" {
parser.Config["Time_Key"] = TimeKey
}
if len(Types) > 0 {
var types []string
for k, v := range Types {
types = append(types, fmt.Sprintf("%s:%s", k, v))
}
sort.Strings(types)
parser.Config["Types"] = strings.Join(types, " ")
}
return parser, parserName
}
func ParserFilterComponents(tag string, field string, parserNames []string, preserveKey bool) []Component {
parsers := [][2]string{}
for _, name := range parserNames {
parsers = append(parsers, [2]string{"Parser", name})
}
parseKey := "message"
if field != "" {
parseKey = field
}
nestFilters := LuaFilterComponents(tag, ParserNestLuaFunction, fmt.Sprintf(ParserNestLuaScriptContents, parseKey))
filter := Component{
Kind: "FILTER",
Config: map[string]string{
"Match": tag,
"Name": "parser",
"Key_Name": parseKey, // Required
// We need to preserve existing fields (like LogName) that are present
// before parsing.
"Reserve_Data": "True",
},
OrderedConfig: parsers,
}
if preserveKey {
filter.Config["Preserve_Key"] = "True"
}
mergeFilters := LuaFilterComponents(tag, ParserMergeLuaFunction, ParserMergeLuaScriptContents)
parseFilters := []Component{}
parseFilters = append(parseFilters, nestFilters...)
parseFilters = append(parseFilters, filter)
parseFilters = append(parseFilters, mergeFilters...)
return parseFilters
}