apps/couchbase.go (374 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apps
import (
"context"
"fmt"
"sort"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/internal/secret"
)
// MetricsReceiverCouchbase is the struct for ops agent monitoring metrics for couchbase
type MetricsReceiverCouchbase struct {
confgenerator.ConfigComponent `yaml:",inline"`
confgenerator.MetricsReceiverShared `yaml:",inline"`
Endpoint string `yaml:"endpoint" validate:"omitempty,hostname_port"`
Username string `yaml:"username" validate:"required"`
Password secret.String `yaml:"password" validate:"required"`
}
const defaultCouchbaseEndpoint = "localhost:8091"
// Type returns the configuration type key of the couchbase receiver
func (r MetricsReceiverCouchbase) Type() string {
return "couchbase"
}
// Pipelines will construct the prometheus receiver configuration
func (r MetricsReceiverCouchbase) Pipelines(_ context.Context) ([]otel.ReceiverPipeline, error) {
targets := []string{r.Endpoint}
if r.Endpoint == "" {
targets = []string{defaultCouchbaseEndpoint}
}
config := map[string]interface{}{
"config": map[string]interface{}{
"scrape_configs": []map[string]interface{}{
{
"job_name": r.Type(),
"scrape_interval": r.CollectionIntervalString(),
"basic_auth": map[string]interface{}{
"username": r.Username,
"password": r.Password.SecretValue(),
},
"metric_relabel_configs": []map[string]interface{}{
{
"source_labels": []string{"__name__"},
"regex": "(kv_ops)|(kv_vb_curr_items)|(kv_num_vbuckets)|(kv_total_memory_used_bytes)|(kv_ep_num_value_ejects)|(kv_ep_mem_high_wat)|(kv_ep_mem_low_wat)|(kv_ep_oom_errors)",
"action": "keep",
},
},
"static_configs": []map[string]interface{}{
{
"targets": targets,
},
},
},
},
},
}
return []otel.ReceiverPipeline{{
Receiver: otel.Component{
Type: "prometheus",
Config: config,
},
Processors: map[string][]otel.Component{"metrics": {
otel.NormalizeSums(),
// remove prometheus scraping meta-metrics
otel.MetricsFilter("exclude", "strict",
"scrape_samples_post_metric_relabeling",
"scrape_series_added",
"scrape_duration_seconds",
"scrape_samples_scraped",
"up",
),
otel.MetricsTransform(
// renaming from prometheus style to otel style, order is important before workload prefix
otel.RenameMetric(
"kv_ops",
"couchbase.bucket.operation.count",
otel.ToggleScalarDataType,
otel.RenameLabel("bucket", "bucket_name"),
),
otel.RenameMetric(
"kv_vb_curr_items",
"couchbase.bucket.item.count",
otel.RenameLabel("bucket", "bucket_name"),
),
otel.RenameMetric(
"kv_num_vbuckets",
"couchbase.bucket.vbucket.count",
otel.RenameLabel("bucket", "bucket_name"),
),
otel.RenameMetric(
"kv_total_memory_used_bytes",
"couchbase.bucket.memory.usage",
otel.RenameLabel("bucket", "bucket_name"),
),
otel.RenameMetric(
"kv_ep_num_value_ejects",
"couchbase.bucket.item.ejection.count",
otel.ToggleScalarDataType,
otel.RenameLabel("bucket", "bucket_name"),
),
otel.RenameMetric(
"kv_ep_mem_high_wat",
"couchbase.bucket.memory.high_water_mark.limit",
otel.RenameLabel("bucket", "bucket_name")),
otel.RenameMetric(
"kv_ep_mem_low_wat",
"couchbase.bucket.memory.low_water_mark.limit",
otel.RenameLabel("bucket", "bucket_name"),
),
otel.RenameMetric(
"kv_ep_tmp_oom_errors",
"couchbase.bucket.error.oom.count.recoverable",
otel.ToggleScalarDataType,
otel.RenameLabel("bucket", "bucket_name"),
),
otel.RenameMetric(
"kv_ep_oom_errors",
"couchbase.bucket.error.oom.count.unrecoverable",
otel.ToggleScalarDataType,
otel.RenameLabel("bucket", "bucket_name"),
),
// combine OOM metrics
otel.CombineMetrics(
`^couchbase\.bucket\.error\.oom\.count\.(?P<error_type>unrecoverable|recoverable)$$`,
"couchbase.bucket.error.oom.count",
),
// group by bucket and op
otel.UpdateMetric(
`couchbase.bucket.operation.count`,
map[string]interface{}{
"action": "aggregate_labels",
"label_set": []string{"bucket_name", "op"},
"aggregation_type": "sum",
},
),
otel.AddPrefix("workload.googleapis.com"),
),
// Using the transform processor for metrics
otel.TransformationMetrics(r.transformMetrics()...),
otel.ModifyInstrumentationScope(r.Type(), "1.0"),
}},
}}, nil
}
type couchbaseMetric struct {
description string
castToSum bool
unit string
}
var metrics = map[string]couchbaseMetric{
"workload.googleapis.com/couchbase.bucket.operation.count": {
description: "Number of operations on the bucket.",
castToSum: true,
unit: "{operations}",
},
"workload.googleapis.com/couchbase.bucket.item.count": {
description: "Number of items that belong to the bucket.",
unit: "{items}",
},
"workload.googleapis.com/couchbase.bucket.vbucket.count": {
description: "Number of non-resident vBuckets.",
unit: "{vbuckets}",
},
"workload.googleapis.com/couchbase.bucket.memory.usage": {
description: "Usage of total memory available to the bucket.",
unit: "By",
},
"workload.googleapis.com/couchbase.bucket.item.ejection.count": {
description: "Number of item value ejections from memory to disk.",
castToSum: true,
unit: "{ejections}",
},
"workload.googleapis.com/couchbase.bucket.error.oom.count": {
description: "Number of out of memory errors.",
castToSum: true,
unit: "{errors}",
},
"workload.googleapis.com/couchbase.bucket.memory.high_water_mark.limit": {
description: "The memory usage at which items will be ejected.",
unit: "By",
},
"workload.googleapis.com/couchbase.bucket.memory.low_water_mark.limit": {
description: "The memory usage at which ejections will stop that were previously triggered by a high water mark breach.",
unit: "By",
},
}
func (r MetricsReceiverCouchbase) transformMetrics() []otel.TransformQuery {
queries := []otel.TransformQuery{}
// persisting order so config generation is non-random
keys := []string{}
for k := range metrics {
keys = append(keys, k)
}
sort.Strings(keys)
for _, metricName := range keys {
m := metrics[metricName]
if m.castToSum {
queries = append(queries, otel.ConvertGaugeToSum(metricName))
}
queries = append(queries, otel.SetDescription(metricName, m.description), otel.SetUnit(metricName, m.unit))
}
return queries
}
func init() {
confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverCouchbase{} })
}
// LoggingReceiverCouchbase is a struct used for generating the fluentbit component for couchbase logs
type LoggingReceiverCouchbase struct {
confgenerator.ConfigComponent `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
}
// Type returns the string identifier for the general couchbase logs
func (lr LoggingReceiverCouchbase) Type() string {
return "couchbase_general"
}
// Components returns the logging components of the couchbase access logs
func (lr LoggingReceiverCouchbase) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(lr.ReceiverMixin.IncludePaths) == 0 {
lr.ReceiverMixin.IncludePaths = []string{
"/opt/couchbase/var/lib/couchbase/logs/couchdb.log",
"/opt/couchbase/var/lib/couchbase/logs/info.log",
"/opt/couchbase/var/lib/couchbase/logs/debug.log",
"/opt/couchbase/var/lib/couchbase/logs/error.log",
"/opt/couchbase/var/lib/couchbase/logs/babysitter.log",
}
}
components := lr.ReceiverMixin.Components(ctx, tag)
components = append(components, confgenerator.LoggingProcessorParseMultilineRegex{
LoggingProcessorParseRegexComplex: confgenerator.LoggingProcessorParseRegexComplex{
Parsers: []confgenerator.RegexParser{
{
Regex: `^\[(?<type>[^:]*):(?<level>[^,]*),(?<timestamp>\d+-\d+-\d+T\d+:\d+:\d+.\d+Z),(?<node_name>[^:]*):([^:]+):(?<source>[^\]]+)\](?<message>.*)$`,
Parser: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: "%Y-%m-%dT%H:%M:%S.%L",
},
},
},
},
Rules: []confgenerator.MultilineRule{
{
StateName: "start_state",
NextState: "cont",
Regex: `^\[([^\s+:]*):`,
},
{
StateName: "cont",
NextState: "cont",
Regex: `^(?!\[([^\s+:]*):).*$`,
},
},
}.Components(ctx, tag, lr.Type())...)
components = append(components,
confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"severity": {
CopyFrom: "jsonPayload.level",
MapValues: map[string]string{
"debug": "DEBUG",
"info": "INFO",
"warn": "WARNING",
"error": "ERROR",
},
MapValuesExclusive: true,
},
InstrumentationSourceLabel: instrumentationSourceValue(lr.Type()),
},
}.Components(ctx, tag, lr.Type())...)
return components
}
// LoggingProcessorCouchbaseHTTPAccess is a struct that will generate the fluentbit components for the http access logs
type LoggingProcessorCouchbaseHTTPAccess struct {
confgenerator.ConfigComponent `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
}
// Type returns the string for the couchbase http access logs
func (lp LoggingProcessorCouchbaseHTTPAccess) Type() string {
return "couchbase_http_access"
}
// Components returns the fluentbit components for the http access logs of couchbase
func (lp LoggingProcessorCouchbaseHTTPAccess) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(lp.ReceiverMixin.IncludePaths) == 0 {
lp.ReceiverMixin.IncludePaths = []string{
"/opt/couchbase/var/lib/couchbase/logs/http_access.log",
"/opt/couchbase/var/lib/couchbase/logs/http_access_internal.log",
}
}
c := lp.ReceiverMixin.Components(ctx, tag)
// TODO: Harden the genericAccessLogParser so it can be used. It didn't work here since there are some minor differences with the
// referer fields and there are additional fields after the user agent here but not in the other apps.
c = append(c,
confgenerator.LoggingProcessorParseRegex{
Regex: `^(?<http_request_remoteIp>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<timestamp>[^\]]*)\] "(?<http_request_requestMethod>\S+) (?<http_request_requestUrl>\S+) (?<http_request_protocol>\S+)" (?<http_request_status>[^ ]*) (?<http_request_responseSize>[^ ]*\S+) (?<http_request_referer>[^ ]*) "(?<http_request_userAgent>[^\"]*)" (?<message>.*)$`,
ParserShared: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: `%d/%b/%Y:%H:%M:%S %z`,
Types: map[string]string{
"size": "integer",
"code": "integer",
},
},
}.Components(ctx, tag, lp.Type())...,
)
mf := confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
InstrumentationSourceLabel: instrumentationSourceValue(lp.Type()),
},
}
// Generate the httpRequest structure.
for _, field := range []string{
"remoteIp",
"requestMethod",
"requestUrl",
"protocol",
"status",
"responseSize",
"referer",
"userAgent",
} {
dest := fmt.Sprintf("httpRequest.%s", field)
src := fmt.Sprintf("jsonPayload.http_request_%s", field)
mf.Fields[dest] = &confgenerator.ModifyField{
MoveFrom: src,
}
if field == "referer" {
mf.Fields[dest].OmitIf = fmt.Sprintf(`%s = "-"`, src)
}
}
c = append(c, mf.Components(ctx, tag, lp.Type())...)
return c
}
// LoggingProcessorCouchbaseGOXDCR is a struct that iwll generate the fluentbit components for the goxdcr logs
type LoggingProcessorCouchbaseGOXDCR struct {
confgenerator.ConfigComponent `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
}
// Type returns the type string for the cross datacenter logs of couchbase
func (lg LoggingProcessorCouchbaseGOXDCR) Type() string {
return "couchbase_goxdcr"
}
// Components returns the fluentbit components for the couchbase goxdcr logs
func (lg LoggingProcessorCouchbaseGOXDCR) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(lg.ReceiverMixin.IncludePaths) == 0 {
lg.ReceiverMixin.IncludePaths = []string{
"/opt/couchbase/var/lib/couchbase/logs/goxdcr.log",
}
}
c := lg.ReceiverMixin.Components(ctx, tag)
c = append(c, confgenerator.LoggingProcessorParseMultilineRegex{
LoggingProcessorParseRegexComplex: confgenerator.LoggingProcessorParseRegexComplex{
Parsers: []confgenerator.RegexParser{
{
Regex: `^(?<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d*Z) (?<level>\w+) (?<log_type>\w+.\w+): (?<message>.*)$`,
Parser: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: "%Y-%m-%dT%H:%M:%S.%L",
},
},
},
},
Rules: []confgenerator.MultilineRule{
{
StateName: "start_state",
NextState: "cont",
Regex: `^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}`,
},
{
StateName: "cont",
NextState: "cont",
Regex: `^(?!\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})`,
},
},
}.Components(ctx, tag, lg.Type())...)
c = append(c,
confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"severity": {
CopyFrom: "jsonPayload.level",
MapValues: map[string]string{
"DEBUG": "DEBUG",
"INFO": "INFO",
"WARN": "WARNING",
"ERROR": "ERROR",
},
MapValuesExclusive: true,
},
InstrumentationSourceLabel: instrumentationSourceValue(lg.Type()),
},
}.Components(ctx, tag, lg.Type())...,
)
return c
}
func init() {
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverCouchbase{} })
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingProcessorCouchbaseHTTPAccess{} })
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingProcessorCouchbaseGOXDCR{} })
}