connector/elasticapmconnector/config.go (233 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package elasticapmconnector // import "github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector" import ( "fmt" "time" lsmconfig "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config" signaltometricsconfig "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/config" "go.opentelemetry.io/collector/component" ) var _ component.Config = (*Config)(nil) var defaultIntervals []time.Duration = []time.Duration{ time.Minute, 10 * time.Minute, 60 * time.Minute, } type Config struct { // Aggregation holds configuration related to aggregation of Elastic APM // metrics from other signals. Aggregation *AggregationConfig `mapstructure:"aggregation"` } type AggregationConfig struct { // Directory holds a path to the directory that is used for persisting // aggregation state to disk. If Directory is empty, in-memory storage // is used. Directory string `mapstructure:"directory"` // MetadataKeys holds a list of client.Metadata keys that will be // propagated through to aggregated metrics. Only the listed metadata // keys will be propagated. // // Entries are case-insensitive, and duplicated entries will trigger // a validation error. MetadataKeys []string `mapstructure:"metadata_keys"` // Intervals holds an optional list of time intervals that the processor // will aggregate over. The interval duration must be in increasing // order and must be a factor of the smallest interval duration. // The default aggregation intervals are 1m, 10m and 60m. // // NOTE: these intervals should only be overridden for testing purposes when // faster processor feedback is required. The default intervals should be preferred // in all other cases -- using this configuration may lead to invalid behavior, // and will not be supported. Intervals []time.Duration `mapstructure:"intervals"` } func (cfg Config) Validate() error { lsmConfig := cfg.lsmConfig() return lsmConfig.Validate() } func (cfg Config) lsmConfig() *lsmconfig.Config { intervals := defaultIntervals if cfg.Aggregation != nil && len(cfg.Aggregation.Intervals) != 0 { intervals = cfg.Aggregation.Intervals } intervalsConfig := make([]lsmconfig.IntervalConfig, 0, len(intervals)) for _, i := range intervals { intervalsConfig = append(intervalsConfig, lsmconfig.IntervalConfig{ Duration: i, Statements: []string{ fmt.Sprintf(`set(attributes["metricset.interval"], "%dm")`, int(i.Minutes())), fmt.Sprintf(`set(attributes["data_stream.dataset"], Concat([attributes["metricset.name"], "%dm"], "."))`, int(i.Minutes())), `set(attributes["processor.event"], "metric")`, }, }) } lsmConfig := &lsmconfig.Config{ Intervals: intervalsConfig, ExponentialHistogramMaxBuckets: 160, } if cfg.Aggregation != nil { lsmConfig.Directory = cfg.Aggregation.Directory lsmConfig.MetadataKeys = cfg.Aggregation.MetadataKeys } return lsmConfig } func (cfg Config) signaltometricsConfig() *signaltometricsconfig.Config { // serviceResourceAttributes is the resource attributes included in // service-level aggregated metrics. serviceResourceAttributes := []signaltometricsconfig.Attribute{ {Key: "service.name"}, {Key: "deployment.environment"}, // service.environment {Key: "telemetry.sdk.language"}, // service.language.name // agent.name is set via elastictraceprocessor for traces, // but not for other signals. Default to "unknown" for the // others. { Key: "agent.name", DefaultValue: "unknown", }, } // transactionResourceAttributes is the resource attributes included // in transaction group-level aggregated metrics. transactionResourceAttributes := append([]signaltometricsconfig.Attribute{ {Key: "container.id"}, {Key: "k8s.pod.name"}, {Key: "service.version"}, {Key: "service.instance.id"}, // service.node.name {Key: "process.runtime.name"}, // service.runtime.name {Key: "process.runtime.version"}, // service.runtime.version {Key: "telemetry.sdk.version"}, // service.language.version?? {Key: "host.name"}, {Key: "os.type"}, // host.os.platform {Key: "faas.instance"}, {Key: "faas.name"}, {Key: "faas.version"}, {Key: "cloud.provider"}, {Key: "cloud.region"}, {Key: "cloud.availability_zone"}, {Key: "cloud.platform"}, // cloud.service.name {Key: "cloud.account.id"}, }, serviceResourceAttributes...) serviceSummaryAttributes := []signaltometricsconfig.Attribute{{ Key: "metricset.name", DefaultValue: "service_summary", }} serviceTransactionAttributes := []signaltometricsconfig.Attribute{ {Key: "transaction.root"}, {Key: "transaction.type"}, {Key: "metricset.name", DefaultValue: "service_transaction"}, } transactionAttributes := []signaltometricsconfig.Attribute{ {Key: "transaction.root"}, {Key: "transaction.name"}, {Key: "transaction.type"}, {Key: "transaction.result"}, {Key: "event.outcome"}, {Key: "metricset.name", DefaultValue: "transaction"}, } serviceDestinationAttributes := []signaltometricsconfig.Attribute{ {Key: "span.name"}, {Key: "event.outcome"}, {Key: "service.target.type"}, {Key: "service.target.name"}, {Key: "span.destination.service.resource"}, {Key: "metricset.name", DefaultValue: "service_destination"}, } // TODO switch to exponential histogram once we have optimised // exponential histogram merging. transactionDurationHistogram := &signaltometricsconfig.Histogram{ Buckets: []float64{ 5000, 10000, 25000, 50000, 75000, 100000, 250000, 500000, 750000, 1000000, 2500000, 5000000, 7500000, 10000000, }, Count: "Int(AdjustedCount())", Value: "Microseconds(end_time - start_time)", } transactionDurationSummaryHistogram := &signaltometricsconfig.Histogram{ Buckets: []float64{1}, Count: "Int(AdjustedCount())", Value: "Microseconds(end_time - start_time)", } return &signaltometricsconfig.Config{ Logs: []signaltometricsconfig.MetricInfo{{ Name: "service_summary", IncludeResourceAttributes: serviceResourceAttributes, Attributes: serviceSummaryAttributes, Sum: &signaltometricsconfig.Sum{Value: "1"}, }}, Datapoints: []signaltometricsconfig.MetricInfo{{ Name: "service_summary", IncludeResourceAttributes: serviceResourceAttributes, Attributes: serviceSummaryAttributes, Sum: &signaltometricsconfig.Sum{Value: "1"}, }}, Spans: []signaltometricsconfig.MetricInfo{{ Name: "service_summary", IncludeResourceAttributes: serviceResourceAttributes, Attributes: serviceSummaryAttributes, Sum: &signaltometricsconfig.Sum{ Value: "Int(AdjustedCount())", }, }, { Name: "transaction.duration.histogram", Description: "APM service transaction aggregated metrics as histogram", IncludeResourceAttributes: serviceResourceAttributes, Attributes: append(serviceTransactionAttributes[:], signaltometricsconfig.Attribute{ Key: "elasticsearch.mapping.hints", DefaultValue: []any{"_doc_count"}, }), Unit: "us", Histogram: transactionDurationHistogram, }, { Name: "transaction.duration.summary", Description: "APM service transaction aggregated metrics as summary", IncludeResourceAttributes: serviceResourceAttributes, Attributes: append(serviceTransactionAttributes[:], signaltometricsconfig.Attribute{ Key: "elasticsearch.mapping.hints", DefaultValue: []any{"aggregate_metric_double"}, }), Unit: "us", Histogram: transactionDurationSummaryHistogram, }, { Name: "transaction.duration.histogram", Description: "APM transaction aggregated metrics as histogram", IncludeResourceAttributes: transactionResourceAttributes, Attributes: append(transactionAttributes[:], signaltometricsconfig.Attribute{ Key: "elasticsearch.mapping.hints", DefaultValue: []any{"_doc_count"}, }), Unit: "us", Histogram: transactionDurationHistogram, }, { Name: "transaction.duration.summary", Description: "APM transaction aggregated metrics as summary", IncludeResourceAttributes: transactionResourceAttributes, Attributes: append(transactionAttributes[:], signaltometricsconfig.Attribute{ Key: "elasticsearch.mapping.hints", DefaultValue: []any{"aggregate_metric_double"}, }), Unit: "us", Histogram: transactionDurationSummaryHistogram, }, { Name: "span.destination.service.response_time.sum.us", Description: "APM span destination metrics", IncludeResourceAttributes: serviceResourceAttributes, Attributes: serviceDestinationAttributes, Unit: "us", Sum: &signaltometricsconfig.Sum{ Value: "Double(Microseconds(end_time - start_time))", }, }, { Name: "span.destination.service.response_time.count", Description: "APM span destination metrics", IncludeResourceAttributes: serviceResourceAttributes, Attributes: serviceDestinationAttributes, Sum: &signaltometricsconfig.Sum{ Value: "Int(AdjustedCount())", }, }, { // event.success_count is populated using 2 metric definition with different conditions // and value for the histogram bucket based on event outcome. Both metric definition // are created using same name and attribute and will result in a single histogram. // We use mapping hint of aggregate_metric_double, so, only the sum and the count // values are required and the actual histogram bucket is ignored. Name: "event.success_count", Description: "Success count as a metric for service transaction", IncludeResourceAttributes: serviceResourceAttributes, Attributes: append(serviceTransactionAttributes[:], signaltometricsconfig.Attribute{ Key: "elasticsearch.mapping.hints", DefaultValue: []any{"aggregate_metric_double"}, }), Conditions: []string{ `attributes["event.outcome"] != nil and attributes["event.outcome"] == "success"`, }, Unit: "us", Histogram: &signaltometricsconfig.Histogram{ Buckets: []float64{1}, Count: "Int(AdjustedCount())", Value: "Int(AdjustedCount())", }, }, { Name: "event.success_count", Description: "Success count as a metric for service transaction", IncludeResourceAttributes: serviceResourceAttributes, Attributes: append(serviceTransactionAttributes[:], signaltometricsconfig.Attribute{ Key: "elasticsearch.mapping.hints", DefaultValue: []any{"aggregate_metric_double"}, }), Conditions: []string{ `attributes["event.outcome"] != nil and attributes["event.outcome"] != "success"`, }, Unit: "us", Histogram: &signaltometricsconfig.Histogram{ Buckets: []float64{0}, Count: "Int(AdjustedCount())", Value: "Double(0)", }, }}, } }