apps/vault.go (310 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package apps import ( "context" "fmt" "github.com/GoogleCloudPlatform/ops-agent/confgenerator" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel" "github.com/GoogleCloudPlatform/ops-agent/internal/secret" ) type MetricsReceiverVault struct { confgenerator.ConfigComponent `yaml:",inline"` confgenerator.MetricsReceiverShared `yaml:",inline"` confgenerator.MetricsReceiverSharedTLS `yaml:",inline"` Token secret.String `yaml:"token"` Endpoint string `yaml:"endpoint" validate:"omitempty,hostname_port"` MetricsPath string `yaml:"metrics_path" validate:"omitempty,startswith=/"` Scheme string `yaml:"scheme" validate:"omitempty"` } const ( defaultVaultEndpoint = "localhost:8200" defaultVaultMetricsPath = "/v1/sys/metrics" defaultVaultScheme = "http" storageLabel = "storage" ) func (r MetricsReceiverVault) getOperationList() []string { return []string{ "delete", "get", "list", "put", } } func (r MetricsReceiverVault) getStorageList() []string { return []string{ "azure", "cassandra", "cockroachdb", "consul", "couchdb", "dynamodb", "etcd", "gcs", "mssql", "mysql", "postgres", "s3", "spanner", "swift", "zookeeper", } } func (r MetricsReceiverVault) Type() string { return "vault" } func (r MetricsReceiverVault) Pipelines(_ context.Context) ([]otel.ReceiverPipeline, error) { if r.Endpoint == "" { r.Endpoint = defaultVaultEndpoint } if r.MetricsPath == "" { r.MetricsPath = defaultVaultMetricsPath } if r.Scheme == "" { r.Scheme = defaultVaultScheme } tlsConfig := r.TLSConfig(true) scrapeConfig := map[string]interface{}{ "job_name": "vault", "scrape_interval": r.CollectionIntervalString(), "metrics_path": r.MetricsPath, "static_configs": []map[string]interface{}{{ "targets": []string{r.Endpoint}, }}, "scheme": r.Scheme, } if r.Token != "" { scrapeConfig["authorization"] = map[string]interface{}{ "credentials": r.Token.SecretValue(), "type": "Bearer", } } if tlsConfig["insecure"] == false { scrapeConfig["scheme"] = "https" } delete(tlsConfig, "insecure") scrapeConfig["tls_config"] = tlsConfig includeMetrics := []string{} queries := []otel.TransformQuery{} storageMetricTransforms, newStorageMetricNames := r.addStorageMetrics() metricRenewRevokeTransforms, newRenewRevokeNames := r.getSummarySumMetricsTransforms() metricDetailTransforms, newMetricNames := r.getMetricTransforms() includeMetrics = append(includeMetrics, newStorageMetricNames...) includeMetrics = append(includeMetrics, newMetricNames...) includeMetrics = append(includeMetrics, newRenewRevokeNames...) queries = append(queries, storageMetricTransforms...) queries = append(queries, metricRenewRevokeTransforms...) queries = append(queries, metricDetailTransforms...) return []otel.ReceiverPipeline{{ Receiver: otel.Component{ Type: "prometheus", Config: map[string]interface{}{ "config": map[string]interface{}{ "scrape_configs": []map[string]interface{}{ scrapeConfig, }, }, }, }, Processors: map[string][]otel.Component{"metrics": { otel.TransformationMetrics(queries...), otel.MetricsFilter( "include", "strict", includeMetrics..., ), // This is currently needed along side the newer transform processor as the new processor doesn't currently support toggling scalar data types. // Issue: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/11810 otel.MetricsTransform( otel.UpdateMetric( "vault.audit.response.failed", otel.ToggleScalarDataType, ), otel.UpdateMetric( "vault.audit.request.failed", otel.ToggleScalarDataType, ), otel.UpdateMetric( "vault.token.lease.count", otel.ToggleScalarDataType, ), otel.UpdateMetric( "vault.token.count", otel.ToggleScalarDataType, ), otel.UpdateMetric( "vault.core.request.count", otel.ToggleScalarDataType, ), ), otel.NormalizeSums(), otel.MetricsTransform( otel.AddPrefix("workload.googleapis.com"), ), otel.ModifyInstrumentationScope(r.Type(), "1.0"), }}, }}, nil } type metricTransformer struct { OldName string NewName string Description string Unit string Monotonic bool } func (r MetricsReceiverVault) addStorageMetrics() (transforms []otel.TransformQuery, newMetrics []string) { storages := r.getStorageList() operations := r.getOperationList() queries := []otel.TransformQuery{} for _, operation := range operations { operationTimeName := "vault.storage.operation." + operation + ".time" operationCountName := "vault.storage.operation." + operation + ".count" newMetrics = append(newMetrics, operationTimeName) newMetrics = append(newMetrics, operationCountName) for _, storage := range storages { oldMetric := "vault_" + storage + "_" + operation queries = append(queries, otel.SummaryCountValToSum(oldMetric, "cumulative", true)) queries = append(queries, otel.SummarySumValToSum(oldMetric, "cumulative", true)) queries = append(queries, otel.SetAttribute(oldMetric+"_count", storageLabel, storage)) queries = append(queries, otel.SetAttribute(oldMetric+"_sum", storageLabel, storage)) queries = append(queries, otel.SetName(oldMetric+"_sum", operationTimeName)) queries = append(queries, otel.SetUnit(operationTimeName, "ms")) queries = append(queries, otel.SetDescription(operationTimeName, fmt.Sprintf("The duration of %s operations executed against the storage backend.", operation))) queries = append(queries, otel.SetName(oldMetric+"_count", operationCountName)) queries = append(queries, otel.SetUnit(operationCountName, "{operations}")) queries = append(queries, otel.SetDescription(operationCountName, fmt.Sprintf("The amount of %s operations executed against the storage backend.", operation))) } } return queries, newMetrics } func (r MetricsReceiverVault) getSummarySumMetricsTransforms() (queries []otel.TransformQuery, newNames []string) { metricTransformers := []metricTransformer{ { OldName: "vault_expire_revoke", NewName: "vault.token.revoke.time", Description: "The average time taken to revoke a token.", Unit: "ms", Monotonic: true, }, { OldName: "vault_expire_renew", NewName: "vault.token.renew.time", Description: "The average time taken to renew a token.", Unit: "ms", Monotonic: true, }, { OldName: "vault_core_leadership_lost", NewName: "vault.core.leader.duration", Description: "The amount of time a core was the leader in high availability mode.", Unit: "ms", Monotonic: false, }, } for _, metric := range metricTransformers { queries = append(queries, otel.SummarySumValToSum(metric.OldName, "cumulative", metric.Monotonic)) queries = append(queries, otel.SetName(metric.OldName+"_sum", metric.NewName)) queries = append(queries, otel.SetUnit(metric.NewName, metric.NewName)) queries = append(queries, otel.SetDescription(metric.NewName, metric.Description)) newNames = append(newNames, metric.NewName) } return queries, newNames } func (r MetricsReceiverVault) getMetricTransforms() (queries []otel.TransformQuery, newNames []string) { metricTransformers := []metricTransformer{ { OldName: "vault_core_in_flight_requests", NewName: "vault.core.request.count", Description: "The number of requests handled by the Vault core.", Unit: "{requests}", }, { OldName: "vault_expire_num_leases", NewName: "vault.token.lease.count", Description: "The number of tokens that are leased for eventual expiration.", Unit: "{tokens}", }, { OldName: "vault_audit_log_request_failure", NewName: "vault.audit.request.failed", Description: "The number of audit log requests that have failed.", Unit: "{requests}", }, { OldName: "vault_audit_log_response_failure", NewName: "vault.audit.response.failed", Description: "The number of audit log responses that have failed.", Unit: "{responses}", }, { OldName: "vault_runtime_sys_bytes", NewName: "vault.memory.usage", Description: "The amount of memory used by Vault.", Unit: "bytes", }, { OldName: "vault_token_count", NewName: "vault.token.count", Description: "The number of tokens created.", Unit: "{tokens}", }, } for _, metric := range metricTransformers { if metric.OldName != "" { newNames = append(newNames, metric.NewName) queries = append(queries, otel.SetName(metric.OldName, metric.NewName)) } queries = append(queries, otel.SetDescription(metric.NewName, metric.Description)) queries = append(queries, otel.SetUnit(metric.NewName, metric.Unit)) } return queries, newNames } func init() { confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverVault{} }) } type LoggingProcessorVaultJson struct { confgenerator.ConfigComponent `yaml:",inline"` } func (LoggingProcessorVaultJson) Type() string { return "vault_audit" } func (p LoggingProcessorVaultJson) Components(ctx context.Context, tag, uid string) []fluentbit.Component { c := []fluentbit.Component{} // sample log line: // {"time":"2022-06-07T20:34:34.392078404Z","type":"request","auth":{"token_type":"default"},"request":{"id":"aa005196-0280-381d-ebeb-1a083bdf5675","operation":"update","namespace":{"id":"root"},"path":"sys/audit/test"}} jsonParser := &confgenerator.LoggingProcessorParseJson{ ParserShared: confgenerator.ParserShared{ TimeKey: "time", TimeFormat: "%Y-%m-%dT%H:%M:%S.%L%z", }, } c = append(c, confgenerator.LoggingProcessorModifyFields{ Fields: map[string]*confgenerator.ModifyField{ InstrumentationSourceLabel: instrumentationSourceValue(p.Type()), }, }.Components(ctx, tag, uid)..., ) c = append(c, jsonParser.Components(ctx, tag, uid)...) return c } type LoggingReceiverVaultAuditJson struct { LoggingProcessorVaultJson `yaml:",inline"` ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline"` IncludePaths []string `yaml:"include_paths,omitempty" validate:"required"` } func (r LoggingReceiverVaultAuditJson) Components(ctx context.Context, tag string) []fluentbit.Component { r.ReceiverMixin.IncludePaths = r.IncludePaths r.ReceiverMixin.MultilineRules = []confgenerator.MultilineRule{ { StateName: "start_state", NextState: "cont", Regex: `^{.*`, }, { StateName: "cont", NextState: "cont", Regex: `^(?!{.*)`, }, } c := r.ReceiverMixin.Components(ctx, tag) return append(c, r.LoggingProcessorVaultJson.Components(ctx, tag, r.LoggingProcessorVaultJson.Type())...) } func init() { confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverVaultAuditJson{} }) }