apps/cassandra.go (224 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apps
import (
"context"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
)
type MetricsReceiverCassandra struct {
confgenerator.ConfigComponent `yaml:",inline"`
confgenerator.MetricsReceiverSharedJVM `yaml:",inline"`
confgenerator.MetricsReceiverSharedCollectJVM `yaml:",inline"`
}
const defaultCassandraEndpoint = "localhost:7199"
func (r MetricsReceiverCassandra) Type() string {
return "cassandra"
}
func (r MetricsReceiverCassandra) Pipelines(_ context.Context) ([]otel.ReceiverPipeline, error) {
targetSystem := "cassandra"
return r.MetricsReceiverSharedJVM.
WithDefaultEndpoint(defaultCassandraEndpoint).
ConfigurePipelines(
r.TargetSystemString(targetSystem),
[]otel.Component{
otel.NormalizeSums(),
otel.MetricsTransform(
otel.AddPrefix("workload.googleapis.com"),
),
otel.ModifyInstrumentationScope(r.Type(), "1.0"),
},
)
}
func init() {
confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverCassandra{} })
}
type LoggingProcessorCassandraSystem struct {
confgenerator.ConfigComponent `yaml:",inline"`
}
func (LoggingProcessorCassandraSystem) Type() string {
return "cassandra_system"
}
func (p LoggingProcessorCassandraSystem) Components(ctx context.Context, tag string, uid string) []fluentbit.Component {
return javaLogParsingComponents(ctx, p.Type(), tag, uid)
}
type LoggingProcessorCassandraDebug struct {
confgenerator.ConfigComponent `yaml:",inline"`
}
func (LoggingProcessorCassandraDebug) Type() string {
return "cassandra_debug"
}
func (p LoggingProcessorCassandraDebug) Components(ctx context.Context, tag string, uid string) []fluentbit.Component {
return javaLogParsingComponents(ctx, p.Type(), tag, uid)
}
func javaLogParsingComponents(ctx context.Context, processorType, tag, uid string) []fluentbit.Component {
c := confgenerator.LoggingProcessorParseMultilineRegex{
LoggingProcessorParseRegexComplex: confgenerator.LoggingProcessorParseRegexComplex{
Parsers: []confgenerator.RegexParser{
{
// Sample line: INFO [IndexSummaryManager:1] 2021-10-07 12:57:05,003 IndexSummaryRedistribution.java:83 - Redistributing index summaries
// Sample line: WARN [main] 2021-10-07 11:57:01,602 StartupChecks.java:329 - Maximum number of memory map areas per process (vm.max_map_count) 65530 is too low, recommended value: 1048575, you can change it with sysctl.
// Sample line: ERROR [MemtablePostFlush:2] 2021-10-05 01:03:35,424 CassandraDaemon.java:579 - Exception in thread Thread[MemtablePostFlush:2,5,main]
// org.apache.cassandra.io.FSReadError: java.io.IOException: Invalid folder descriptor trying to create log replica /folder/views-9786ac1cdd583201a7cdad556410c985
// at org.apache.cassandra.db.lifecycle.LogReplica.create(LogReplica.java:59)
// at org.apache.cassandra.db.lifecycle.LogReplicaSet.maybeCreateReplica(LogReplicaSet.java:87)
// at org.apache.cassandra.db.lifecycle.LogFile.makeAddRecord(LogFile.java:336)
// at org.apache.cassandra.db.lifecycle.LogFile.add(LogFile.java:310)
Regex: `^(?<level>[A-Z]+)\s+\[(?<module>[^\]]+)\]\s+(?<time>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+)\s+(?<message>(?:(?<javaClass>[\w\.]+):(?<lineNumber>\d+))?[\S\s]+)`,
Parser: confgenerator.ParserShared{
TimeKey: "time",
TimeFormat: "%Y-%m-%d %H:%M:%S,%L",
Types: map[string]string{
"lineNumber": "integer",
},
},
},
},
},
Rules: []confgenerator.MultilineRule{
{
StateName: "start_state",
NextState: "cont",
Regex: `^[A-Z]+\s+\[[^\]]+\] \d+`,
},
{
StateName: "cont",
NextState: "cont",
Regex: `^(?![A-Z]+\s+\[[^\]]+\] \d+)`,
},
},
}.Components(ctx, tag, uid)
// Best documentation found for log levels:
// https://docs.datastax.com/en/cassandra-oss/3.0/cassandra/configuration/configLoggingLevels.html#Loglevels
c = append(c,
confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"severity": {
CopyFrom: "jsonPayload.level",
MapValues: map[string]string{
"TRACE": "TRACE",
"DEBUG": "DEBUG",
"INFO": "INFO",
"ERROR": "ERROR",
"WARN": "WARNING",
},
MapValuesExclusive: true,
},
InstrumentationSourceLabel: instrumentationSourceValue(processorType),
},
}.Components(ctx, tag, uid)...,
)
return c
}
type LoggingProcessorCassandraGC struct {
confgenerator.ConfigComponent `yaml:",inline"`
}
func (LoggingProcessorCassandraGC) Type() string {
return "cassandra_gc"
}
func (p LoggingProcessorCassandraGC) Components(ctx context.Context, tag string, uid string) []fluentbit.Component {
c := confgenerator.LoggingProcessorParseMultilineRegex{
LoggingProcessorParseRegexComplex: confgenerator.LoggingProcessorParseRegexComplex{
Parsers: []confgenerator.RegexParser{
{
// Compatible with Java versions pre-11
// Vast majority of lines look like the first, with time stopped & time stopping
// Sample line: 2021-10-02T04:18:28.284+0000: 3.315: Total time for which application threads were stopped: 0.0002390 seconds, Stopping threads took: 0.0000281 seconds
// Sample line: 2021-10-05T01:20:52.695+0000: 4.434: [GC (CMS Initial Mark) [1 CMS-initial-mark: 0K(3686400K)] 36082K(4055040K), 0.0130057 secs] [Times: user=0.04 sys=0.00, real=0.01 secs]
// Sample line: 2021-10-05T01:20:52.741+0000: 4.481: [CMS-concurrent-preclean-start]
// Lines may also contain more detailed GC Heap information in the following lines
Regex: `^(?<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,6}(?:Z|[+-]\d{2}:?\d{2})):\s+(?<uptime>\d+\.\d{3,6}):\s+(?<message>(?:Total time for which application threads were stopped: (?<timeStopped>\d+\.\d+) seconds, Stopping threads took: (?<timeStopping>\d+\.\d+)[\s\S]*|[\s\S]+))`,
Parser: confgenerator.ParserShared{
TimeKey: "time",
TimeFormat: "%Y-%m-%dT%H:%M:%S.%L%z",
Types: map[string]string{
"uptime": "float",
"timeStopped": "float",
"timeStopping": "float",
},
},
},
{
// Compatible with Java versions 11+ (see https://bugs.openjdk.org/browse/JDK-8046148)
// Vast majority of lines look like the first, with time stopped & time stopping
// Sample line: [2023-05-16T14:51:23.332+0000][4.595s][5195][5217][info ] Total time for which application threads were stopped: 0.0003091 seconds, Stopping threads took: 0.0000181 seconds
// Sample line: [2023-05-16T14:51:23.332+0000][4.595s][5195][5216][info ] GC(1) Concurrent Abortable Preclean 540.253ms
// Sample line: [2023-05-16T14:51:23.332+0000][4.595s][5195][5217][info ] Application time: 0.0001203 seconds
// Lines may also contain more detailed GC Heap information in the following lines
Regex: `^\[(?<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,6}(?:Z|[+-]\d{2}:?\d{2}))\]\s?\[(?<uptime>\d+\.\d{3,6})s?\]\s?\[(?<pid>\d+)\]\s?\[(?<tid>\d+)\]\s?\[(?<level>\w+)\s?\]\s?(?<message>(?:Total time for which application threads were stopped: (?<timeStopped>\d+\.\d+) seconds, Stopping threads took: (?<timeStopping>\d+\.\d+)[\s\S]*|[\s\S]+))`,
Parser: confgenerator.ParserShared{
TimeKey: "time",
TimeFormat: "%Y-%m-%dT%H:%M:%S.%L%z",
Types: map[string]string{
"uptime": "float",
"pid": "integer",
"tid": "integer",
"timeStopped": "float",
"timeStopping": "float",
},
},
},
},
},
Rules: []confgenerator.MultilineRule{
{
StateName: "start_state",
NextState: "cont",
Regex: `^\[?\d{4}-\d{2}-\d{2}`,
},
{
StateName: "cont",
NextState: "cont",
Regex: `^(?!\[?\d{4}-\d{2}-\d{2})`,
},
},
}.Components(ctx, tag, uid)
// Java11+ gc logs have severity in the log line
// https://bugs.openjdk.org/browse/JDK-8046148
c = append(c,
confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"severity": {
CopyFrom: "jsonPayload.level",
MapValues: map[string]string{
"develop": "TRACE",
"trace": "TRACE",
"debug": "DEBUG",
"info": "INFO",
"error": "ERROR",
"warning": "WARNING",
},
MapValuesExclusive: true,
},
InstrumentationSourceLabel: instrumentationSourceValue(p.Type()),
},
}.Components(ctx, tag, uid)...,
)
return c
}
type LoggingReceiverCassandraSystem struct {
LoggingProcessorCassandraSystem `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
}
func (r LoggingReceiverCassandraSystem) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(r.ReceiverMixin.IncludePaths) == 0 {
r.ReceiverMixin.IncludePaths = []string{
// Default log file path on Debian / Ubuntu / RHEL / CentOS
"/var/log/cassandra/system*.log",
// No default install position / log path for SLES
}
}
c := r.ReceiverMixin.Components(ctx, tag)
c = append(c, r.LoggingProcessorCassandraSystem.Components(ctx, tag, "cassandra_system")...)
return c
}
type LoggingReceiverCassandraDebug struct {
LoggingProcessorCassandraDebug `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
}
func (r LoggingReceiverCassandraDebug) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(r.ReceiverMixin.IncludePaths) == 0 {
r.ReceiverMixin.IncludePaths = []string{
// Default log file path on Debian / Ubuntu / RHEL / CentOS
"/var/log/cassandra/debug*.log",
// No default install position / log path for SLES
}
}
c := r.ReceiverMixin.Components(ctx, tag)
c = append(c, r.LoggingProcessorCassandraDebug.Components(ctx, tag, "cassandra_debug")...)
return c
}
type LoggingReceiverCassandraGC struct {
LoggingProcessorCassandraGC `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
}
func (r LoggingReceiverCassandraGC) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(r.ReceiverMixin.IncludePaths) == 0 {
r.ReceiverMixin.IncludePaths = []string{
// Default log file path on Debian / Ubuntu / RHEL / CentOS for JDK 8
"/var/log/cassandra/gc.log.*.current",
// Default log file path on Debian / Ubuntu / RHEL / CentOS for JDK 11
"/var/log/cassandra/gc.log",
// No default install position / log path for SLES
}
}
c := r.ReceiverMixin.Components(ctx, tag)
c = append(c, r.LoggingProcessorCassandraGC.Components(ctx, tag, "cassandra_gc")...)
return c
}
func init() {
confgenerator.LoggingProcessorTypes.RegisterType(func() confgenerator.LoggingProcessor { return &LoggingProcessorCassandraSystem{} })
confgenerator.LoggingProcessorTypes.RegisterType(func() confgenerator.LoggingProcessor { return &LoggingProcessorCassandraDebug{} })
confgenerator.LoggingProcessorTypes.RegisterType(func() confgenerator.LoggingProcessor { return &LoggingProcessorCassandraGC{} })
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverCassandraSystem{} })
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverCassandraDebug{} })
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverCassandraGC{} })
}