apps/mongodb.go (262 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package apps import ( "context" "fmt" "strings" "github.com/GoogleCloudPlatform/ops-agent/confgenerator" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit/modify" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel" "github.com/GoogleCloudPlatform/ops-agent/internal/secret" ) type MetricsReceiverMongoDB struct { confgenerator.ConfigComponent `yaml:",inline"` confgenerator.MetricsReceiverSharedTLS `yaml:",inline"` confgenerator.MetricsReceiverShared `yaml:",inline"` Endpoint string `yaml:"endpoint,omitempty"` Username string `yaml:"username,omitempty"` Password secret.String `yaml:"password,omitempty"` } type MetricsReceiverMongoDBHosts struct { Endpoint string `yaml:"endpoint"` Transport string `yaml:"transport"` } const defaultMongodbEndpoint = "localhost:27017" func (r MetricsReceiverMongoDB) Type() string { return "mongodb" } func (r MetricsReceiverMongoDB) Pipelines(_ context.Context) ([]otel.ReceiverPipeline, error) { transport := "tcp" if r.Endpoint == "" { r.Endpoint = defaultMongodbEndpoint } else if strings.HasSuffix(r.Endpoint, ".sock") { transport = "unix" } hosts := []MetricsReceiverMongoDBHosts{ { r.Endpoint, transport, }, } config := map[string]interface{}{ "hosts": hosts, "username": r.Username, "password": r.Password.SecretValue(), "collection_interval": r.CollectionIntervalString(), } if transport != "unix" { config["tls"] = r.TLSConfig(false) } return []otel.ReceiverPipeline{{ Receiver: otel.Component{ Type: r.Type(), Config: config, }, Processors: map[string][]otel.Component{"metrics": { otel.NormalizeSums(), otel.MetricsTransform( otel.AddPrefix("workload.googleapis.com"), ), otel.ModifyInstrumentationScope(r.Type(), "1.0"), }}, }}, nil } func init() { confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverMongoDB{} }) } type LoggingProcessorMongodb struct { confgenerator.ConfigComponent `yaml:",inline"` } func (*LoggingProcessorMongodb) Type() string { return "mongodb" } func (p *LoggingProcessorMongodb) Components(ctx context.Context, tag, uid string) []fluentbit.Component { c := []fluentbit.Component{} c = append(c, p.JsonLogComponents(ctx, tag, uid)...) c = append(c, p.RegexLogComponents(tag, uid)...) c = append(c, p.severityParser(ctx, tag, uid)...) return c } // JsonLogComponents are the fluentbit components for parsing log messages that are json formatted. // these are generally messages from mongo with versions greater than or equal to 4.4 // documentation: https://docs.mongodb.com/v4.4/reference/log-messages/#log-message-format func (p *LoggingProcessorMongodb) JsonLogComponents(ctx context.Context, tag, uid string) []fluentbit.Component { c := p.jsonParserWithTimeKey(ctx, tag, uid) c = append(c, p.promoteWiredTiger(tag, uid)...) c = append(c, p.renames(tag, uid)...) return c } // jsonParserWithTimeKey requires promotion of the nested timekey for the json parser so we must // first promote the $date field from the "t" field before declaring the parser func (p *LoggingProcessorMongodb) jsonParserWithTimeKey(ctx context.Context, tag, uid string) []fluentbit.Component { c := []fluentbit.Component{} jsonParser := &confgenerator.LoggingProcessorParseJson{ ParserShared: confgenerator.ParserShared{ TimeKey: "time", TimeFormat: "%Y-%m-%dT%H:%M:%S.%L%z", Types: map[string]string{ "id": "integer", "message": "string", }, }, } jpComponents := jsonParser.Components(ctx, tag, uid) // The parserFilterComponent is the actual filter component that configures and defines // which parser to use. We need the component to determine which parser to use when // re-parsing below. Each time a parser filter is used, there are 2 filter components right // before it to account for the nest lua script (see confgenerator/fluentbit/parse_deduplication.go). // Therefore, the parse filter component is actually the third component in the list. parserFilterComponent := jpComponents[2] c = append(c, jpComponents...) tempPrefix := "temp_ts_" timeKey := "time" // have to bring $date to top level in order for it to be parsed as timeKey // see https://github.com/fluent/fluent-bit/issues/1013 liftTs := fluentbit.Component{ Kind: "FILTER", Config: map[string]string{ "Name": "nest", "Match": tag, "Operation": "lift", "Nested_under": "t", "Add_prefix": tempPrefix, }, } renameTsOption := modify.NewHardRenameOptions(fmt.Sprintf("%s$date", tempPrefix), timeKey) renameTs := renameTsOption.Component(tag) c = append(c, liftTs, renameTs) // IMPORTANT: now that we have lifted the json to top level // we need to re-parse in order to properly set time at the // parser level nestFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserNestLuaFunction, fmt.Sprintf(fluentbit.ParserNestLuaScriptContents, "message")) parserFilter := fluentbit.Component{ Kind: "FILTER", Config: map[string]string{ "Name": "parser", "Match": tag, "Key_Name": "message", "Reserve_Data": "True", "Parser": parserFilterComponent.OrderedConfig[0][1], }, } mergeFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserMergeLuaFunction, fluentbit.ParserMergeLuaScriptContents) c = append(c, nestFilters...) c = append(c, parserFilter) c = append(c, mergeFilters...) removeTimestamp := fluentbit.Component{ Kind: "FILTER", Config: map[string]string{ "Name": "modify", "Match": tag, "Remove": timeKey, }, } c = append(c, removeTimestamp) return c } // severityParser is used by both regex and json parser to ensure an "s" field on the entry gets translated // to a valid logging.googleapis.com/seveirty field func (p *LoggingProcessorMongodb) severityParser(ctx context.Context, tag, uid string) []fluentbit.Component { severityComponents := []fluentbit.Component{} severityComponents = append(severityComponents, confgenerator.LoggingProcessorModifyFields{ Fields: map[string]*confgenerator.ModifyField{ "jsonPayload.severity": { MoveFrom: "jsonPayload.s", }, "severity": { CopyFrom: "jsonPayload.s", MapValues: map[string]string{ "D": "DEBUG", "D1": "DEBUG", "D2": "DEBUG", "D3": "DEBUG", "D4": "DEBUG", "D5": "DEBUG", "I": "INFO", "E": "ERROR", "F": "FATAL", "W": "WARNING", }, MapValuesExclusive: true, }, InstrumentationSourceLabel: instrumentationSourceValue(p.Type()), }, }.Components(ctx, tag, uid)..., ) return severityComponents } func (p *LoggingProcessorMongodb) renames(tag, uid string) []fluentbit.Component { r := []fluentbit.Component{} renames := []struct { src string dest string }{ {"c", "component"}, {"ctx", "context"}, {"msg", "message"}, } for _, rename := range renames { rename := modify.NewRenameOptions(rename.src, rename.dest) r = append(r, rename.Component(tag)) } return r } func (p *LoggingProcessorMongodb) promoteWiredTiger(tag, uid string) []fluentbit.Component { // promote messages that are WiredTiger messages and are nested in attr.message addPrefix := "temp_attributes_" upNest := fluentbit.Component{ Kind: "FILTER", Config: map[string]string{ "Name": "nest", "Match": tag, "Operation": "lift", "Nested_under": "attr", "Add_prefix": addPrefix, }, } hardRenameMessage := modify.NewHardRenameOptions(fmt.Sprintf("%smessage", addPrefix), "msg") wiredTigerRename := hardRenameMessage.Component(tag) renameRemainingAttributes := fluentbit.Component{ Kind: "FILTER", Config: map[string]string{ "Name": "nest", "Wildcard": fmt.Sprintf("%s*", addPrefix), "Match": tag, "Operation": "nest", "Nest_under": "attributes", "Remove_prefix": addPrefix, }, } return []fluentbit.Component{upNest, wiredTigerRename, renameRemainingAttributes} } func (p *LoggingProcessorMongodb) RegexLogComponents(tag, uid string) []fluentbit.Component { c := []fluentbit.Component{} parseKey := "message" parser, parserName := fluentbit.ParserComponentBase("%Y-%m-%dT%H:%M:%S.%L%z", "timestamp", map[string]string{ "message": "string", "id": "integer", "s": "string", "component": "string", "context": "string", }, fmt.Sprintf("%s_regex", tag), uid) parser.Config["Format"] = "regex" parser.Config["Regex"] = `^(?<timestamp>[^ ]*)\s+(?<s>\w)\s+(?<component>[^ ]+)\s+\[(?<context>[^\]]+)]\s+(?<message>.*?) *(?<ms>(\d+))?(:?ms)?$` parser.Config["Key_Name"] = parseKey nestFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserNestLuaFunction, fmt.Sprintf(fluentbit.ParserNestLuaScriptContents, parseKey)) parserFilter := fluentbit.Component{ Kind: "FILTER", Config: map[string]string{ "Match": tag, "Name": "parser", "Parser": parserName, "Reserve_Data": "True", "Key_Name": parseKey, }, } mergeFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserMergeLuaFunction, fluentbit.ParserMergeLuaScriptContents) c = append(c, parser) c = append(c, nestFilters...) c = append(c, parserFilter) c = append(c, mergeFilters...) return c } type LoggingReceiverMongodb struct { LoggingProcessorMongodb `yaml:",inline"` ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"` } func (r *LoggingReceiverMongodb) Components(ctx context.Context, tag string) []fluentbit.Component { if len(r.ReceiverMixin.IncludePaths) == 0 { r.ReceiverMixin.IncludePaths = []string{ // default logging location "/var/log/mongodb/mongod.log*", } } c := r.ReceiverMixin.Components(ctx, tag) c = append(c, r.LoggingProcessorMongodb.Components(ctx, tag, "mongodb")...) return c } func init() { confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverMongodb{} }) }