apps/zookeeper.go (130 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package apps import ( "context" "github.com/GoogleCloudPlatform/ops-agent/confgenerator" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel" ) func init() { confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverZookeeper{} }) } type MetricsReceiverZookeeper struct { confgenerator.ConfigComponent `yaml:",inline"` confgenerator.MetricsReceiverShared `yaml:",inline"` Endpoint string `yaml:"endpoint" validate:"omitempty,hostname_port"` } const defaultZookeeperEndpoint = "localhost:2181" func (MetricsReceiverZookeeper) Type() string { return "zookeeper" } func (r MetricsReceiverZookeeper) Pipelines(_ context.Context) ([]otel.ReceiverPipeline, error) { if r.Endpoint == "" { r.Endpoint = defaultZookeeperEndpoint } return []otel.ReceiverPipeline{{ Receiver: otel.Component{ Type: "zookeeper", Config: map[string]interface{}{ "collection_interval": r.CollectionIntervalString(), "endpoint": r.Endpoint, }, }, Processors: map[string][]otel.Component{"metrics": { otel.NormalizeSums(), otel.MetricsTransform( otel.AddPrefix("workload.googleapis.com"), ), otel.ModifyInstrumentationScope(r.Type(), "1.0"), }}, }}, nil } type LoggingProcessorZookeeperGeneral struct { confgenerator.ConfigComponent `yaml:",inline"` } func (LoggingProcessorZookeeperGeneral) Type() string { return "zookeeper_general" } func (p LoggingProcessorZookeeperGeneral) Components(ctx context.Context, tag, uid string) []fluentbit.Component { c := []fluentbit.Component{} complexRegex := confgenerator.LoggingProcessorParseRegexComplex{ Parsers: []confgenerator.RegexParser{ { // Sample log line: 2022-01-31 17:51:45,451 [myid:1] - INFO [NIOWorkerThread-3:NIOServerCnxn@514] - Processing mntr command from /0:0:0:0:0:0:0:1:50284 // Sample log line: 2022-02-01 00:46:33,626 [myid:1] - WARN [SendWorker:2:QuorumCnxManager$SendWorker@1283] - Interrupted while waiting for message on queue // Sample log line: java.lang.InterruptedException // Sample log line: at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056) // Sample log line: at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2133) // Sample log line: at org.apache.zookeeper.util.CircularBlockingQueue.poll(CircularBlockingQueue.java:105) // Sample log line: at org.apache.zookeeper.server.quorum.QuorumCnxManager.pollSendQueue(QuorumCnxManager.java:1448) // Sample log line: at org.apache.zookeeper.server.quorum.QuorumCnxManager.access$900(QuorumCnxManager.java:99) // Sample log line: at org.apache.zookeeper.server.quorum.QuorumCnxManager$SendWorker.run(QuorumCnxManager.java:1272) Regex: `^(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\s\[myid:(?<myid>\d+)?\]\s-\s(?<level>\w+)\s+\[(?<thread>.+):(?<source>.+)@(?<line>\d+)\]\s+-\s*(?<message>[\S\s]*)`, Parser: confgenerator.ParserShared{ TimeKey: "time", TimeFormat: "%Y-%m-%d %H:%M:%S,%L", Types: map[string]string{ "myid": "integer", "thread": "string", "source": "string", "line": "integer", }, }, }, { // Sample log line: 2022-01-31 17:51:45,451 - INFO [NIOWorkerThread-3:NIOServerCnxn@514] - Processing mntr command from /0:0:0:0:0:0:0:1:50284 // Sample log line: 2022-02-01 00:46:33,626 - WARN [SendWorker:2:QuorumCnxManager$SendWorker@1283] - Interrupted while waiting for message on queue // Sample log line: java.lang.InterruptedException // Sample log line: at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056) // Sample log line: at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2133) // Sample log line: at org.apache.zookeeper.util.CircularBlockingQueue.poll(CircularBlockingQueue.java:105) // Sample log line: at org.apache.zookeeper.server.quorum.QuorumCnxManager.pollSendQueue(QuorumCnxManager.java:1448) // Sample log line: at org.apache.zookeeper.server.quorum.QuorumCnxManager.access$900(QuorumCnxManager.java:99) // Sample log line: at org.apache.zookeeper.server.quorum.QuorumCnxManager$SendWorker.run(QuorumCnxManager.java:1272) Regex: `^(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\s-\s(?<level>\w+)\s+\[(?<thread>.+):(?<source>.+)@(?<line>\d+)\]\s+-\s*(?<message>[\S\s]*)`, Parser: confgenerator.ParserShared{ TimeKey: "time", TimeFormat: "%Y-%m-%d %H:%M:%S,%L", Types: map[string]string{ "thread": "string", "source": "string", "line": "integer", }, }, }, }, } c = append(c, complexRegex.Components(ctx, tag, uid)...) c = append(c, severityParser(ctx, p.Type(), tag, uid)...) return c } type LoggingReceiverZookeeperGeneral struct { LoggingProcessorZookeeperGeneral `yaml:",inline"` ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline"` } func (r LoggingReceiverZookeeperGeneral) Components(ctx context.Context, tag string) []fluentbit.Component { if len(r.ReceiverMixin.IncludePaths) == 0 { // Default log for Zookeeper. r.ReceiverMixin.IncludePaths = []string{ "/opt/zookeeper/logs/zookeeper-*.out", "/var/log/zookeeper/zookeeper.log", } } r.ReceiverMixin.MultilineRules = []confgenerator.MultilineRule{ { StateName: "start_state", NextState: "cont", Regex: `^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3}`, }, { StateName: "cont", NextState: "cont", Regex: `^(?!\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})`, }, } c := r.ReceiverMixin.Components(ctx, tag) return append(c, r.LoggingProcessorZookeeperGeneral.Components(ctx, tag, "zookeeper_general")...) } func severityParser(ctx context.Context, processorType, tag, uid string) []fluentbit.Component { return confgenerator.LoggingProcessorModifyFields{ Fields: map[string]*confgenerator.ModifyField{ "severity": { CopyFrom: "jsonPayload.level", MapValues: map[string]string{ "TRACE": "DEBUG", "DEBUG": "DEBUG", "INFO": "INFO", "WARN": "WARNING", "ERROR": "ERROR", "CRITICAL": "ERROR", "FATAL": "FATAL", }, MapValuesExclusive: true, }, InstrumentationSourceLabel: instrumentationSourceValue(processorType), }, }.Components(ctx, tag, uid) } func init() { confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverZookeeperGeneral{} }) }