apps/kafka.go (121 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package apps import ( "context" "github.com/GoogleCloudPlatform/ops-agent/confgenerator" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel" ) type MetricsReceiverKafka struct { confgenerator.ConfigComponent `yaml:",inline"` confgenerator.MetricsReceiverSharedJVM `yaml:",inline"` confgenerator.MetricsReceiverSharedCollectJVM `yaml:",inline"` } const defaultKafkaEndpoint = "localhost:9999" func (r MetricsReceiverKafka) Type() string { return "kafka" } func (r MetricsReceiverKafka) Pipelines(_ context.Context) ([]otel.ReceiverPipeline, error) { targetSystem := "kafka" return r.MetricsReceiverSharedJVM. WithDefaultEndpoint(defaultKafkaEndpoint). ConfigurePipelines( r.TargetSystemString(targetSystem), []otel.Component{ // Kafka script contains several metrics not desired by ops-agent // as it existed in opentelemetry-java-contrib prior to the // development of this integration otel.MetricsFilter( "include", "strict", "kafka.message.count", "kafka.request.count", "kafka.request.failed", "kafka.request.time.total", "kafka.network.io", "kafka.purgatory.size", "kafka.partition.count", "kafka.partition.offline", "kafka.partition.under_replicated", "kafka.isr.operation.count", ), otel.NormalizeSums(), otel.MetricsTransform( otel.AddPrefix("workload.googleapis.com"), ), otel.ModifyInstrumentationScope(r.Type(), "1.0"), }, ) } func init() { confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverKafka{} }) } type LoggingProcessorKafka struct { confgenerator.ConfigComponent `yaml:",inline"` } func (LoggingProcessorKafka) Type() string { return "kafka" } func (p LoggingProcessorKafka) Components(ctx context.Context, tag string, uid string) []fluentbit.Component { c := confgenerator.LoggingProcessorParseMultilineRegex{ LoggingProcessorParseRegexComplex: confgenerator.LoggingProcessorParseRegexComplex{ Parsers: []confgenerator.RegexParser{ { // Sample line: [2022-01-26 18:25:20,466] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@22ff4249 (org.apache.zookeeper.ZooKeeper) // Sample line: [2022-01-26 18:25:20,485] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) // Sample line: [2022-02-01 21:34:19,045] INFO [BrokerToControllerChannelManager broker=0 name=alterIsr]: Recorded new controller, from now on will use broker sam-test-kafka.c.otel-agent-dev.internal:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread) // Sample line: [2022-02-01 21:34:21,230] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) // Sample line: [2022-02-01 21:34:26,063] INFO [LogLoader partition=quickstart-events-1, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log$) // Sample line: [2022-01-26 18:25:20,462] INFO Client environment:java.class.path=/opt/kafka/bin/../libs/activation-1.1.1.jar:/opt/kafka/bin/... (org.apache.zookeeper.ZooKeeper) // Sample line: [2022-01-26 18:25:21,107] INFO KafkaConfig values: // advertised.listeners = null // alter.config.policy.class.name = null // alter.log.dirs.replication.quota.window.num = 11 // alter.log.dirs.replication.quota.window.size.seconds = 1 Regex: `^\[(?<time>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+)\]\s+(?<level>[A-Z]+)(?:\s+\[(?<source>.*)\]:?)?\s+(?<message>[\s\S]*)(?=\s+\([\w\s\.\$]+\)$|\s+$)(?:\s+\((?<logger>[\w\s\.\$]+)\))?`, Parser: confgenerator.ParserShared{ TimeKey: "time", TimeFormat: "%Y-%m-%d %H:%M:%S,%L", }, }, }, }, Rules: []confgenerator.MultilineRule{ { StateName: "start_state", NextState: "cont", Regex: `^\[\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+\]`, }, { StateName: "cont", NextState: "cont", Regex: `^(?!\[\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+\])`, }, }, }.Components(ctx, tag, uid) // Log levels are just log4j log levels // https://logging.apache.org/log4j/2.x/log4j-api/apidocs/org/apache/logging/log4j/Level.html c = append(c, confgenerator.LoggingProcessorModifyFields{ Fields: map[string]*confgenerator.ModifyField{ "severity": { CopyFrom: "jsonPayload.level", MapValues: map[string]string{ "TRACE": "TRACE", "DEBUG": "DEBUG", "INFO": "INFO", "ERROR": "ERROR", "WARN": "WARNING", "FATAL": "CRITICAL", }, MapValuesExclusive: true, }, InstrumentationSourceLabel: instrumentationSourceValue(p.Type()), }, }.Components(ctx, tag, uid)..., ) return c } type LoggingReceiverKafka struct { LoggingProcessorKafka `yaml:",inline"` ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"` } func (r LoggingReceiverKafka) Components(ctx context.Context, tag string) []fluentbit.Component { if len(r.ReceiverMixin.IncludePaths) == 0 { r.ReceiverMixin.IncludePaths = []string{ // No default package installers, these are common log paths from installs online "/var/log/kafka/*.log", "/opt/kafka/logs/server.log", "/opt/kafka/logs/controller.log", } } c := r.ReceiverMixin.Components(ctx, tag) c = append(c, r.LoggingProcessorKafka.Components(ctx, tag, "kafka")...) return c } func init() { confgenerator.LoggingProcessorTypes.RegisterType(func() confgenerator.LoggingProcessor { return &LoggingProcessorKafka{} }) confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverKafka{} }) }