apps/rabbitmq.go (127 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package apps import ( "context" "github.com/GoogleCloudPlatform/ops-agent/confgenerator" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel" "github.com/GoogleCloudPlatform/ops-agent/internal/secret" ) type LoggingProcessorRabbitmq struct { confgenerator.ConfigComponent `yaml:",inline"` } func (*LoggingProcessorRabbitmq) Type() string { return "rabbitmq" } func (p *LoggingProcessorRabbitmq) Components(ctx context.Context, tag, uid string) []fluentbit.Component { c := confgenerator.LoggingProcessorParseRegexComplex{ Parsers: []confgenerator.RegexParser{ { // Sample log line: // 2022-01-31 18:01:20.441571+00:00 [erro] <0.692.0> ** Connection attempt from node 'rabbit_ctl_17@keith-testing-rabbitmq' rejected. Invalid challenge reply. ** Regex: `^(?<timestamp>\d+-\d+-\d+\s+\d+:\d+:\d+[.,]\d+\+\d+:\d+) \[(?<severity>\w+)\] \<(?<process_id>\d+\.\d+\.\d+)\> (?<message>.*)$`, Parser: confgenerator.ParserShared{ TimeKey: "timestamp", TimeFormat: "%Y-%m-%d %H:%M:%S.%L%z", }, }, { // Sample log line: // 2023-02-01 12:45:14.705 [info] <0.801.0> Successfully set user tags for user 'admin' to [administrator] Regex: `^(?<timestamp>\d+-\d+-\d+\s+\d+:\d+:\d+[.,]\d+\d+\d+) \[(?<severity>\w+)\] \<(?<process_id>\d+\.\d+\.\d+)\> (?<message>.*)$`, Parser: confgenerator.ParserShared{ TimeKey: "timestamp", TimeFormat: "%Y-%m-%d %H:%M:%S.%L", }, }, }, }.Components(ctx, tag, uid) // severities documented here: https://www.rabbitmq.com/logging.html#log-levels c = append(c, confgenerator.LoggingProcessorModifyFields{ Fields: map[string]*confgenerator.ModifyField{ "severity": { CopyFrom: "jsonPayload.severity", MapValues: map[string]string{ "debug": "DEBUG", "warning": "WARNING", "error": "ERROR", "info": "INFO", "noti": "DEFAULT", }, MapValuesExclusive: true, }, InstrumentationSourceLabel: instrumentationSourceValue(p.Type()), }, }.Components(ctx, tag, uid)..., ) return c } type LoggingReceiverRabbitmq struct { LoggingProcessorRabbitmq `yaml:",inline"` ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"` } func (r LoggingReceiverRabbitmq) Components(ctx context.Context, tag string) []fluentbit.Component { if len(r.ReceiverMixin.IncludePaths) == 0 { r.ReceiverMixin.IncludePaths = []string{ "/var/log/rabbitmq/rabbit*.log", } } // Some multiline entries related to crash logs are important to capture and end in // // 2022-01-31 18:07:43.557042+00:00 [erro] <0.130.0> // BOOT FAILED // =========== // ERROR: could not bind to distribution port 25672, it is in use by another node: rabbit@keith-testing-rabbitmq // r.ReceiverMixin.MultilineRules = []confgenerator.MultilineRule{ { StateName: "start_state", NextState: "cont", Regex: `^\d+-\d+-\d+ \d+:\d+:\d+\.\d+\+\d+:\d+`, }, { StateName: "cont", NextState: "cont", Regex: `^(?!\d+-\d+-\d+ \d+:\d+:\d+\.\d+\+\d+:\d+)`, }, } c := r.ReceiverMixin.Components(ctx, tag) c = append(c, r.LoggingProcessorRabbitmq.Components(ctx, tag, "rabbitmq")...) return c } func init() { confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverRabbitmq{} }) } type MetricsReceiverRabbitmq struct { confgenerator.ConfigComponent `yaml:",inline"` confgenerator.MetricsReceiverShared `yaml:",inline"` confgenerator.MetricsReceiverSharedTLS `yaml:",inline"` Password secret.String `yaml:"password" validate:"required"` Username string `yaml:"username" validate:"required"` Endpoint string `yaml:"endpoint" validate:"omitempty,url"` } const defaultRabbitmqTCPEndpoint = "http://localhost:15672" func (r MetricsReceiverRabbitmq) Type() string { return "rabbitmq" } func (r MetricsReceiverRabbitmq) Pipelines(_ context.Context) ([]otel.ReceiverPipeline, error) { if r.Endpoint == "" { r.Endpoint = defaultRabbitmqTCPEndpoint } cfg := map[string]interface{}{ "collection_interval": r.CollectionIntervalString(), "endpoint": r.Endpoint, "username": r.Username, "password": r.Password.SecretValue(), "tls": r.TLSConfig(true), } return []otel.ReceiverPipeline{{ Receiver: otel.Component{ Type: "rabbitmq", Config: cfg, }, Processors: map[string][]otel.Component{"metrics": { otel.NormalizeSums(), otel.MetricsTransform( otel.AddPrefix("workload.googleapis.com"), ), otel.TransformationMetrics( otel.FlattenResourceAttribute("rabbitmq.queue.name", "queue_name"), otel.FlattenResourceAttribute("rabbitmq.node.name", "node_name"), otel.FlattenResourceAttribute("rabbitmq.vhost.name", "vhost_name"), ), otel.ModifyInstrumentationScope(r.Type(), "1.0"), }}, }}, nil } func init() { confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverRabbitmq{} }) }