apps/elasticsearch.go (250 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apps
import (
"context"
"fmt"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/internal/secret"
)
type MetricsReceiverElasticsearch struct {
confgenerator.ConfigComponent `yaml:",inline"`
confgenerator.MetricsReceiverShared `yaml:",inline"`
confgenerator.MetricsReceiverSharedTLS `yaml:",inline"`
confgenerator.MetricsReceiverSharedCollectJVM `yaml:",inline"`
confgenerator.MetricsReceiverSharedCluster `yaml:",inline"`
Endpoint string `yaml:"endpoint" validate:"omitempty,url,startswith=http:|startswith=https:"`
Username string `yaml:"username" validate:"required_with=Password"`
Password secret.String `yaml:"password" validate:"required_with=Username"`
}
const (
defaultElasticsearchEndpoint = "http://localhost:9200"
)
func (r MetricsReceiverElasticsearch) Type() string {
return "elasticsearch"
}
func (r MetricsReceiverElasticsearch) Pipelines(_ context.Context) ([]otel.ReceiverPipeline, error) {
if r.Endpoint == "" {
r.Endpoint = defaultElasticsearchEndpoint
}
metricsConfig := map[string]interface{}{}
// Custom logic needed to skip JVM metrics, since JMX receiver is not used here.
if !r.ShouldCollectJVMMetrics() {
r.skipJVMMetricsConfig(metricsConfig)
}
// Custom logic needed to disable index metrics, since they were previously locked behind a feature gate.
// When support for them is added, this logic can be removed and the index name resource attribute will need
// to be flattened to the metrics.
r.disableIndexMetrics(metricsConfig)
cfg := map[string]interface{}{
"collection_interval": r.CollectionIntervalString(),
"endpoint": r.Endpoint,
"username": r.Username,
"password": r.Password.SecretValue(),
"nodes": []string{"_local"},
"tls": r.TLSConfig(true),
"skip_cluster_metrics": !r.ShouldCollectClusterMetrics(),
"metrics": metricsConfig,
}
return []otel.ReceiverPipeline{{
Receiver: otel.Component{
Type: "elasticsearch",
Config: cfg,
},
Processors: map[string][]otel.Component{"metrics": {
otel.NormalizeSums(),
// Elasticsearch Cluster metrics come with a summary JVM heap memory that is not useful && causes DuplicateTimeSeries errors
otel.MetricsOTTLFilter(
[]string{
`name == "jvm.memory.heap.used" and resource.attributes["elasticsearch.node.name"] == nil`,
},
[]string{}),
otel.MetricsTransform(
otel.AddPrefix("workload.googleapis.com"),
),
otel.ModifyInstrumentationScope(r.Type(), "1.0"),
}},
}}, nil
}
func (r MetricsReceiverElasticsearch) skipJVMMetricsConfig(metricsConfig map[string]interface{}) {
jvmMetrics := []string{
"jvm.classes.loaded",
"jvm.gc.collections.count",
"jvm.gc.collections.elapsed",
"jvm.memory.heap.max",
"jvm.memory.heap.used",
"jvm.memory.heap.committed",
"jvm.memory.nonheap.used",
"jvm.memory.nonheap.committed",
"jvm.memory.pool.max",
"jvm.memory.pool.used",
"jvm.threads.count",
}
for _, metric := range jvmMetrics {
metricsConfig[metric] = map[string]bool{
"enabled": false,
}
}
}
func (r MetricsReceiverElasticsearch) disableIndexMetrics(metricsConfig map[string]interface{}) {
indexMetrics := []string{
"elasticsearch.index.cache.evictions",
"elasticsearch.index.cache.memory.usage",
"elasticsearch.index.cache.size",
"elasticsearch.index.documents",
"elasticsearch.index.operations.completed",
"elasticsearch.index.operations.merge.docs_count",
"elasticsearch.index.operations.merge.size",
"elasticsearch.index.operations.time",
"elasticsearch.index.segments.count",
"elasticsearch.index.segments.memory",
"elasticsearch.index.segments.size",
"elasticsearch.index.shards.size",
"elasticsearch.index.translog.operations",
"elasticsearch.index.translog.size",
}
for _, metric := range indexMetrics {
metricsConfig[metric] = map[string]bool{
"enabled": false,
}
}
}
func init() {
confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverElasticsearch{} })
}
type LoggingProcessorElasticsearchJson struct {
confgenerator.ConfigComponent `yaml:",inline"`
}
func (LoggingProcessorElasticsearchJson) Type() string {
return "elasticsearch_json"
}
func (p LoggingProcessorElasticsearchJson) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
c := []fluentbit.Component{}
// sample log line:
// {"type": "server", "timestamp": "2022-01-17T18:31:47,365Z", "level": "INFO", "component": "o.e.n.Node", "cluster.name": "elasticsearch", "node.name": "ubuntu-jammy", "message": "initialized" }
// Logs are formatted based on configuration (log4j);
// See https://artifacts.elastic.co/javadoc/org/elasticsearch/elasticsearch/7.16.2/org/elasticsearch/common/logging/ESJsonLayout.html
// for general layout, and https://www.elastic.co/guide/en/elasticsearch/reference/current/logging.html for general configuration of logging
jsonParser := &confgenerator.LoggingProcessorParseJson{
ParserShared: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: "%Y-%m-%dT%H:%M:%S,%L%z",
},
}
c = append(c, jsonParser.Components(ctx, tag, uid)...)
c = append(c, p.severityParser(ctx, tag, uid)...)
c = append(c, p.nestingProcessors(ctx, tag, uid)...)
return c
}
func (p LoggingProcessorElasticsearchJson) severityParser(ctx context.Context, tag, uid string) []fluentbit.Component {
return confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"severity": {
CopyFrom: "jsonPayload.level",
MapValues: map[string]string{
"TRACE": "DEBUG",
"DEBUG": "DEBUG",
"INFO": "INFO",
"WARN": "WARNING",
"DEPRECATION": "WARNING",
"ERROR": "ERROR",
"CRITICAL": "ERROR",
"FATAL": "FATAL",
},
MapValuesExclusive: true,
},
InstrumentationSourceLabel: instrumentationSourceValue(p.Type()),
},
}.Components(ctx, tag, uid)
}
func (p LoggingProcessorElasticsearchJson) nestingProcessors(ctx context.Context, tag, uid string) []fluentbit.Component {
// The majority of these prefixes come from here:
// https://www.elastic.co/guide/en/elasticsearch/reference/7.16/audit-event-types.html#audit-event-attributes
// Non-audit logs are formatted using the layout documented here, giving the "cluster" prefix:
// https://artifacts.elastic.co/javadoc/org/elasticsearch/elasticsearch/7.16.2/org/elasticsearch/common/logging/ESJsonLayout.html
prefixes := []string{
"user.run_by",
"user.run_as",
"authentication.token",
"node",
"event",
"authentication",
"user",
"origin",
"request",
"url",
"host",
"apikey",
"cluster",
}
c := make([]fluentbit.Component, 0, len(prefixes))
for _, prefix := range prefixes {
nestProcessor := confgenerator.LoggingProcessorNestWildcard{
Wildcard: fmt.Sprintf("%s.*", prefix),
NestUnder: prefix,
RemovePrefix: fmt.Sprintf("%s.", prefix),
}
c = append(c, nestProcessor.Components(ctx, tag, uid)...)
}
return c
}
type LoggingProcessorElasticsearchGC struct {
confgenerator.ConfigComponent `yaml:",inline"`
}
func (LoggingProcessorElasticsearchGC) Type() string {
return "elasticsearch_gc"
}
func (p LoggingProcessorElasticsearchGC) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
c := []fluentbit.Component{}
regexParser := confgenerator.LoggingProcessorParseRegex{
// Sample log line:
// [2022-01-17T18:31:37.240+0000][652141][gc,start ] GC(0) Pause Young (Normal) (G1 Evacuation Pause)
Regex: `\[(?<time>\d+-\d+-\d+T\d+:\d+:\d+.\d+\+\d+)\]\[\d+\]\[(?<type>[A-z,]+)\s*\]\s*(?:GC\((?<gc_run>\d+)\))?\s*(?<message>.*)`,
ParserShared: confgenerator.ParserShared{
TimeKey: "time",
TimeFormat: "%Y-%m-%dT%H:%M:%S.%L%z",
Types: map[string]string{
"gc_run": "integer",
},
},
}
c = append(c, regexParser.Components(ctx, tag, uid)...)
c = append(c,
confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
InstrumentationSourceLabel: instrumentationSourceValue(p.Type()),
},
}.Components(ctx, tag, uid)...,
)
return c
}
type LoggingReceiverElasticsearchJson struct {
LoggingProcessorElasticsearchJson `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline"`
}
func (r LoggingReceiverElasticsearchJson) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(r.ReceiverMixin.IncludePaths) == 0 {
// Default JSON logs for Elasticsearch
r.ReceiverMixin.IncludePaths = []string{
"/var/log/elasticsearch/*_server.json",
"/var/log/elasticsearch/*_deprecation.json",
"/var/log/elasticsearch/*_index_search_slowlog.json",
"/var/log/elasticsearch/*_index_indexing_slowlog.json",
"/var/log/elasticsearch/*_audit.json",
}
}
// When Elasticsearch emits stack traces, the json log may be spread across multiple lines,
// so we need this multiline parsing to properly parse the record.
// Example multiline log record:
// {"type": "server", "timestamp": "2022-01-20T15:46:00,131Z", "level": "ERROR", "component": "o.e.b.ElasticsearchUncaughtExceptionHandler", "cluster.name": "elasticsearch", "node.name": "brandon-testing-elasticsearch", "message": "uncaught exception in thread [main]",
// "stacktrace": ["org.elasticsearch.bootstrap.StartupException: java.lang.IllegalArgumentException: unknown setting [invalid.key] please check that any required plugins are installed, or check the breaking changes documentation for removed settings",
// -- snip --
// "at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:166) ~[elasticsearch-7.16.2.jar:7.16.2]",
// "... 6 more"] }
r.ReceiverMixin.MultilineRules = []confgenerator.MultilineRule{
{
StateName: "start_state",
NextState: "cont",
Regex: `^{.*`,
},
{
StateName: "cont",
NextState: "cont",
Regex: `^[^{].*[,}]$`,
},
}
c := r.ReceiverMixin.Components(ctx, tag)
return append(c, r.LoggingProcessorElasticsearchJson.Components(ctx, tag, "elasticsearch_json")...)
}
type LoggingReceiverElasticsearchGC struct {
LoggingProcessorElasticsearchGC `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline"`
}
func (r LoggingReceiverElasticsearchGC) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(r.ReceiverMixin.IncludePaths) == 0 {
// Default GC log for Elasticsearch
r.ReceiverMixin.IncludePaths = []string{
"/var/log/elasticsearch/gc.log",
}
}
c := r.ReceiverMixin.Components(ctx, tag)
return append(c, r.LoggingProcessorElasticsearchGC.Components(ctx, tag, "elasticsearch_gc")...)
}
func init() {
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverElasticsearchJson{} })
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverElasticsearchGC{} })
}