confgenerator/otel/processors.go (433 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package otel import ( "fmt" "path" "sort" "strings" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel/ottl" ) // Helper functions to easily build up processor configs. // MetricsFilter returns a Component that filters metrics. // polarity should be "include" or "exclude". // matchType should be "strict" or "regexp". func MetricsFilter(polarity, matchType string, metricNames ...string) Component { return Component{ Type: "filter", Config: map[string]interface{}{ "metrics": map[string]interface{}{ polarity: map[string]interface{}{ "match_type": matchType, "metric_names": metricNames, }, }, }, } } // MetricsOTTLFilter returns a Component that filters metrics using OTTL. // OTTL can only be used as an exclude filter, any metrics or datapoints that match // one of the provided queries are dropped. // Example query: 'name == "jvm.memory.heap.used" and resource.attributes["elasticsearch.node.name"] == nil' func MetricsOTTLFilter(metricQueries []string, datapointQueries []string) Component { metricsConfig := map[string]interface{}{} if len(metricQueries) > 0 { metricsConfig["metric"] = metricQueries } if len(datapointQueries) > 0 { metricsConfig["datapoint"] = metricQueries } return Component{ Type: "filter", Config: map[string]interface{}{ "metrics": metricsConfig, }, } } // MetricsTransform returns a Component that performs the transformations specified as arguments. func MetricsTransform(metrics ...map[string]interface{}) Component { return Component{ Type: "metricstransform", Config: map[string]interface{}{ "transforms": metrics, }, } } // NormalizeSums returns a Component that performs counter normalization. func NormalizeSums() Component { return Component{ Type: "normalizesums", Config: map[string]interface{}{}, } } // CastToSum returns a Component that performs a cast of each metric to a sum. func CastToSum(metrics ...string) Component { return Component{ Type: "casttosum", Config: map[string]interface{}{ "metrics": metrics, }, } } // CumulativeToDelta returns a Component that converts each cumulative metric to delta. func CumulativeToDelta(metrics ...string) Component { return Component{ Type: "cumulativetodelta", Config: map[string]interface{}{ "include": map[string]interface{}{ "metrics": metrics, "match_type": "strict", }, }, } } // DeltaToRate returns a Component that converts each delta metric to a gauge rate. func DeltaToRate(metrics ...string) Component { return Component{ Type: "deltatorate", Config: map[string]interface{}{ "metrics": metrics, }, } } // AddPrefix returns a config snippet that adds a domain prefix to all metrics. func AddPrefix(prefix string, operations ...map[string]interface{}) map[string]interface{} { return RegexpRename( `^(.*)$`, path.Join(prefix, `${1}`), operations..., ) } // ChangePrefix returns a config snippet that updates a prefix on all metrics. func ChangePrefix(oldPrefix, newPrefix string) map[string]interface{} { return RegexpRename( fmt.Sprintf(`^%s(.*)$`, oldPrefix), fmt.Sprintf("%s%s", newPrefix, `${1}`), ) } // RegexpRename returns a config snippet that renames metrics matching the given regexp. // The `rename` argument supports capture groups as `${1}`, `${2}`, and so on. func RegexpRename(regexp string, rename string, operations ...map[string]interface{}) map[string]interface{} { // $ needs to be escaped because reasons. // https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/metricstransformprocessor#rename-multiple-metrics-using-substitution out := map[string]interface{}{ "include": strings.ReplaceAll(regexp, "$", "$$"), "match_type": "regexp", "action": "update", "new_name": strings.ReplaceAll(rename, "$", "$$"), } if len(operations) > 0 { out["operations"] = operations } return out } // Transform returns a transform processor object that executes statements on statementType data. func Transform(statementType, context string, statements ottl.Statements) Component { return Component{ Type: "transform", Config: map[string]any{ "error_mode": "ignore", fmt.Sprintf("%s_statements", statementType): []map[string]any{ { "context": context, "statements": statements, }, }, }, } } // Filter returns a filter processor object that drops dataType.context data matching any of the expressions. func Filter(dataType, context string, expressions []ottl.Value) Component { var strings []string for _, e := range expressions { strings = append(strings, e.String()) } return Component{ Type: "filter", Config: map[string]any{ "error_mode": "ignore", dataType: map[string]any{ context: strings, }, }, } } // TransformationMetrics returns a transform processor object that contains all the queries passed into it. func TransformationMetrics(queries ...TransformQuery) Component { metricQueryStrings := []string{} datapointQueryStrings := []string{} for _, q := range queries { switch q.Context { case Metric: metricQueryStrings = append(metricQueryStrings, string(q.Statement)) default: datapointQueryStrings = append(datapointQueryStrings, string(q.Statement)) } } // Only add metricStatement type if any queuries to go with it metricStatements := []map[string]any{} if len(metricQueryStrings) != 0 { metricMetricStatement := map[string]any{ "context": "metric", "statements": metricQueryStrings, } metricStatements = append(metricStatements, metricMetricStatement) } if len(datapointQueryStrings) != 0 { datapointMetricStatement := map[string]any{ "context": "datapoint", "statements": datapointQueryStrings, } metricStatements = append(metricStatements, datapointMetricStatement) } return Component{ Type: "transform", Config: map[string]any{ "metric_statements": metricStatements, }, } } // TransformQueryContext is a type wrapper for the context of a query expression within the transoform processor type TransformQueryContext string const ( Metric TransformQueryContext = "metric" Datapoint TransformQueryContext = "datapoint" ) // TransformQuery is a type wrapper for query expressions supported by the transform // processor found here: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor type TransformQuery struct { Context TransformQueryContext Statement string } // FlattenResourceAttribute returns an expression that brings down a resource attribute to a // metric attribute. func FlattenResourceAttribute(resourceAttribute, metricAttribute string) TransformQuery { return TransformQuery{ Context: Datapoint, Statement: fmt.Sprintf(`set(attributes["%s"], resource.attributes["%s"])`, metricAttribute, resourceAttribute), } } // GroupByAttribute returns an expression that makes a metric attribute into a resource attribute. func GroupByAttribute(attribute string) TransformQuery { return TransformQuery{ Context: Datapoint, Statement: fmt.Sprintf(`set(resource.attributes["%s"], attributes["%s"])`, attribute, attribute), } } // DeleteMetricAttribute returns an expression that removes the metric attribute specified. func DeleteMetricAttribute(metricAttribute string) TransformQuery { return TransformQuery{ Context: Datapoint, Statement: fmt.Sprintf(`delete_key(attributes, "%s")`, metricAttribute), } } // ConvertGaugeToSum returns an expression where a gauge metric can be converted into a sum func ConvertGaugeToSum(metricName string) TransformQuery { return TransformQuery{ Context: Metric, Statement: fmt.Sprintf(`convert_gauge_to_sum("cumulative", true) where name == "%s"`, metricName), } } // ConvertFloatToInt returns an expression where a float-valued metric can be converted to an int func ConvertFloatToInt(metricName string) TransformQuery { return TransformQuery{ Context: Datapoint, Statement: fmt.Sprintf(`set(value_int, Int(value_double)) where metric.name == "%s"`, metricName), } } // SetDescription returns a metrics transform expression where the metrics description will be set to what is provided func SetDescription(metricName, metricDescription string) TransformQuery { return TransformQuery{ Context: Datapoint, Statement: fmt.Sprintf(`set(metric.description, "%s") where metric.name == "%s"`, metricDescription, metricName), } } // SetUnit returns a metrics transform expression where the metric unit is set to provided value func SetUnit(metricName, unit string) TransformQuery { return TransformQuery{ Context: Datapoint, Statement: fmt.Sprintf(`set(metric.unit, "%s") where metric.name == "%s"`, unit, metricName), } } // SetName returns a metrics transform expression where the metric name is set to provided value func SetName(oldName, newName string) TransformQuery { return TransformQuery{ Context: Datapoint, Statement: fmt.Sprintf(`set(metric.name, "%s") where metric.name == "%s"`, newName, oldName), } } func SetAttribute(metricName, attributeKey, attributeValue string) TransformQuery { return TransformQuery{ Context: Datapoint, Statement: fmt.Sprintf(`set(attributes["%s"], "%s") where metric.name == "%s"`, attributeKey, attributeValue, metricName), } } // SummarySumValToSum creates a new Sum metric out of a summary metric's sum value. The new metric has a name of "<Old Name>_sum". func SummarySumValToSum(metricName, aggregation string, isMonotonic bool) TransformQuery { return TransformQuery{ Context: Datapoint, Statement: fmt.Sprintf(`convert_summary_sum_val_to_sum("%s", %t) where metric.name == "%s"`, aggregation, isMonotonic, metricName), } } // SummaryCountValToSum creates a new Sum metric out of a summary metric's count value. The new metric has a name of "<Old Name>_count". func SummaryCountValToSum(metricName, aggregation string, isMonotonic bool) TransformQuery { return TransformQuery{ Context: Datapoint, Statement: fmt.Sprintf(`convert_summary_count_val_to_sum("%s", %t) where metric.name == "%s"`, aggregation, isMonotonic, metricName), } } // RetainResource retains the resource attributes provided, and drops all other attributes. func RetainResource(resourceAttributeKeys ...string) TransformQuery { return TransformQuery{ Context: Datapoint, Statement: fmt.Sprintf(`keep_keys(resource.attributes, [%s])`, strings.Join(resourceAttributeKeys[:], ",")), } } // RenameMetric returns a config snippet that renames old to new, applying zero or more transformations. func RenameMetric(old, new string, operations ...map[string]interface{}) map[string]interface{} { out := map[string]interface{}{ "include": old, "action": "update", "new_name": new, } if len(operations) > 0 { out["operations"] = operations } return out } // UpdateMetric returns a config snippet applies transformations to the given metric name func UpdateMetric(metric string, operations ...map[string]interface{}) map[string]interface{} { out := map[string]interface{}{ "include": metric, "action": "update", } if len(operations) > 0 { out["operations"] = operations } return out } // UpdateMetricRegexp returns a config snippet that applies transformations to metrics matching // the input regex func UpdateMetricRegexp(metricRegex string, operations ...map[string]interface{}) map[string]interface{} { out := map[string]interface{}{ "include": metricRegex, "match_type": "regexp", "action": "update", } if len(operations) > 0 { out["operations"] = operations } return out } // DuplicateMetric returns a config snippet that copies old to new, applying zero or more transformations. func DuplicateMetric(old, new string, operations ...map[string]interface{}) map[string]interface{} { out := map[string]interface{}{ "include": old, "action": "insert", "new_name": new, } if len(operations) > 0 { out["operations"] = operations } return out } // CombineMetrics returns a config snippet that renames metrics matching the regex old to new, applying zero or more transformations. func CombineMetrics(old, new string, operations ...map[string]interface{}) map[string]interface{} { out := map[string]interface{}{ "include": old, "match_type": "regexp", "action": "combine", "new_name": new, "submatch_case": "lower", } if len(operations) > 0 { out["operations"] = operations } return out } // ToggleScalarDataType transforms int -> double and double -> int. var ToggleScalarDataType = map[string]interface{}{"action": "toggle_scalar_data_type"} // AddLabel adds a label with a fixed value. func AddLabel(key, value string) map[string]interface{} { return map[string]interface{}{ "action": "add_label", "new_label": key, "new_value": value, } } // RenameLabel renames old to new func RenameLabel(old, new string) map[string]interface{} { return map[string]interface{}{ "action": "update_label", "label": old, "new_label": new, } } // RenameLabelValues renames label values func RenameLabelValues(label string, transforms map[string]string) map[string]interface{} { var actions []map[string]string for old, new := range transforms { actions = append(actions, map[string]string{ "value": old, "new_value": new, }) } sort.Slice(actions, func(i, j int) bool { return actions[i]["value"] < actions[j]["value"] }) return map[string]interface{}{ "action": "update_label", "label": label, "value_actions": actions, } } // DeleteLabelValue removes streams with the given label value func DeleteLabelValue(label, value string) map[string]interface{} { return map[string]interface{}{ "action": "delete_label_value", "label": label, "label_value": value, } } // ScaleValue multiplies the value by factor func ScaleValue(factor float64) map[string]interface{} { return map[string]interface{}{ "action": "experimental_scale_value", "experimental_scale": factor, } } // AggregateLabels removes all labels except those in the passed list, aggregating values using aggregationType. func AggregateLabels(aggregationType string, labels ...string) map[string]interface{} { return map[string]interface{}{ "action": "aggregate_labels", "label_set": labels, "aggregation_type": aggregationType, } } // AggregateLabelValues combines the given values into a single value func AggregateLabelValues(aggregationType string, label string, new string, old ...string) map[string]interface{} { return map[string]interface{}{ "action": "aggregate_label_values", "aggregation_type": aggregationType, "label": label, "new_value": new, "aggregated_values": old, } } // CondenseResourceMetrics merges multiple resource metrics on // a slice of metrics to a single resource metrics payload, if they have the same // resource. func CondenseResourceMetrics() Component { return Component{ Type: "groupbyattrs", Config: map[string]any{}, } } // ModifyInstrumentationScope sets the instrumentation scope name and version // fields which will later be exported to Cloud Monitoring metric labels. // The name will always be prefixed with "agent.googleapis.com/". func ModifyInstrumentationScope(name string, version string) Component { return Component{ Type: "modifyscope", Config: map[string]interface{}{ "override_scope_name": "agent.googleapis.com/" + name, "override_scope_version": version, }, } } // GroupByGMPAttrs moves the "namespace", "cluster", and "location" // metric attributes to resource attributes. The // googlemanagedprometheus exporter will use these resource attributes // to populate metric labels. func GroupByGMPAttrs() Component { return Component{ Type: "groupbyattrs", Config: map[string]interface{}{ "keys": []string{"namespace", "cluster", "location"}, }, } } // GroupByGMPAttrs_OTTL moves the "namespace", "cluster", and "location" // metric attributes to resource attributes similar to GroupByGMPAttrs. // The difference here is it uses OTTL instead of the groupbyattrs processor // since that processor discards certain metadata from the metrics that are // important to preserve prometheus untyped metrics. // // See more detail in https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33419. func GroupByGMPAttrs_OTTL() Component { return TransformationMetrics( GroupByAttribute("location"), GroupByAttribute("cluster"), GroupByAttribute("namespace"), DeleteMetricAttribute("location"), DeleteMetricAttribute("cluster"), DeleteMetricAttribute("namespace"), ) } // GCPResourceDetector returns a resourcedetection processor configured for only GCP. func GCPResourceDetector(override bool) Component { config := map[string]interface{}{ "detectors": []string{"gcp"}, } if override != true { // override defaults to true; omit it in that case to prevent config diffs. config["override"] = override } return Component{ Type: "resourcedetection", Config: config, } } // ResourceTransform returns a Component that applies changes on resource attributes. func ResourceTransform(attributes map[string]string, override bool) Component { a := []map[string]interface{}{} keys := make([]string, 0, len(attributes)) for k := range attributes { keys = append(keys, k) } sort.Strings(keys) action := "insert" if override { action = "upsert" } for _, v := range keys { a = append(a, map[string]interface{}{ "key": v, "value": attributes[v], "action": action, }) } config := map[string]interface{}{ "attributes": a, } return Component{ Type: "resource", Config: config, } }