confgenerator/otel/processors.go (157 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package otel import "fmt" // Helper functions to easily build up processor configs. // MetricsFilter returns a Component that filters metrics. // polarity should be "include" or "exclude". // matchType should be "strict" or "regexp". func MetricsFilter(polarity, matchType string, metricNames ...string) Component { return Component{ Type: "filter", Config: map[string]interface{}{ "metrics": map[string]interface{}{ polarity: map[string]interface{}{ "match_type": matchType, "metric_names": metricNames, }, }, }, } } // MetricsTransform returns a Component that performs the transformations specified as arguments. func MetricsTransform(metrics ...map[string]interface{}) Component { return Component{ Type: "metricstransform", Config: map[string]interface{}{ "transforms": metrics, }, } } // RenameMetric returns a config snippet that renames old to new, applying zero or more transformations. func RenameMetric(old, new string, operations ...map[string]interface{}) map[string]interface{} { out := map[string]interface{}{ "include": old, "action": "update", "new_name": new, } if len(operations) > 0 { out["operations"] = operations } return out } // CombineMetrics returns a config snippet that renames metrics matching the regex old to new, applying zero or more transformations. func CombineMetrics(old, new string, operations ...map[string]interface{}) map[string]interface{} { out := map[string]interface{}{ "include": old, "match_type": "regexp", "action": "combine", "new_name": new, "submatch_case": "lower", } if len(operations) > 0 { out["operations"] = operations } return out } // ToggleScalarDataType transforms int -> double and double -> int. var ToggleScalarDataType = map[string]interface{}{"action": "toggle_scalar_data_type"} // AddLabel adds a label with a fixed value. func AddLabel(key, value string) map[string]interface{} { return map[string]interface{}{ "action": "add_label", "new_label": key, "new_value": value, } } // RenameLabel renames old to new func RenameLabel(old, new string) map[string]interface{} { return map[string]interface{}{ "action": "update_label", "label": old, "new_label": new, } } // AggregateLabels removes all labels except those in the passed list, aggregating values using aggregationType. func AggregateLabels(aggregationType string, labels ...string) map[string]interface{} { return map[string]interface{}{ "action": "aggregate_labels", "label_set": labels, "aggregation_type": aggregationType, } } // GroupByGMPAttrs moves the "namespace" and "cluster" metric attributes to // resource attributes. // // Metrics coming from run-gmp-sidecar are written against the // `prometheus_target` monitored resource in Cloud Monitoring. The labels for // these monitored resources come from the OTel resource labels. As a result, // this processor needs to promote certain metric labels to resource labels so // the translation can happen correctly. // // See https://cloud.google.com/monitoring/api/resources#tag_prometheus_target // for more information about the monitored resource used. func GroupByGMPAttrs() Component { return Component{ Type: "groupbyattrs", Config: map[string]interface{}{ "keys": []string{"namespace", "cluster"}, }, } } // GCPResourceDetector returns a resourcedetection processor configured for only GCP. func GCPResourceDetector() Component { config := map[string]interface{}{ "detectors": []string{"gcp", "env"}, } return Component{ Type: "resourcedetection", Config: config, } } // AddResourceAttr adds a resource attribute using a processor. func AddResourceAttr(key, from string) Component { attributeConfig := map[string]interface{}{ "key": key, "from_attribute": from, "action": "insert", } config := map[string]interface{}{ "attributes": attributeConfig, } return Component{ Type: "resource", Config: config, } } // TransformationMetrics returns a transform processor object that contains all the queries passed into it. func TransformationMetrics(queries ...TransformQuery) Component { queryStrings := []string{} for _, q := range queries { queryStrings = append(queryStrings, string(q)) } datapointStatements := []map[string]any{ { "context": "datapoint", "statements": queryStrings, }, } return Component{ Type: "transform", Config: map[string]any{ "metric_statements": datapointStatements, }, } } // TransformQuery is a type wrapper for query expressions supported by the transform // processor found here: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor type TransformQuery string // FlattenResourceAttribute returns an expression that brings down a resource attribute to a // metric attribute. func FlattenResourceAttribute(resourceAttribute, metricAttribute string) TransformQuery { return TransformQuery(fmt.Sprintf(`set(attributes["%s"], resource.attributes["%s"])`, metricAttribute, resourceAttribute)) } // GroupByAttribute returns an expression that makes a metric attribute into a resource attribute. func GroupByAttribute(resourceAttribute, metricAttribute string) TransformQuery { return TransformQuery(fmt.Sprintf(`set(resource.attributes["%s"], attributes["%s"]) where attributes["%s"] != nil`, resourceAttribute, metricAttribute, metricAttribute)) } // DeleteMetricAttribute returns an expression that removes the metric attribute specified. func DeleteMetricAttribute(metricAttribute string) TransformQuery { return TransformQuery(fmt.Sprintf(`delete_key(attributes, "%s")`, metricAttribute)) } // PrefixResourceAttribute prefixes the resource attribute with another resource // attribute. // // Note: Mutating the resource attribute results in this update happening for // each data point. Since the OTTL statement uses the resource attribute in // both the target and the source labels, we must make sure after the first // mutation, the subsequent transformations for the same resource is a no-op. func PrefixResourceAttribute(destResourceAttribute, srcResourceAttribute, delimiter string) TransformQuery { return TransformQuery(fmt.Sprintf(`replace_pattern(resource.attributes["%s"], "^(\\d+)$$", Concat([resource.attributes["%s"], "$$1"], "%s"))`, destResourceAttribute, srcResourceAttribute, delimiter)) } // AddMetricLabel adds a new metric attribute. If it already exists, then it is overwritten. func AddMetricLabel(key, val string) TransformQuery { return TransformQuery(fmt.Sprintf(`set(attributes["%s"], "%s")`, key, val)) } // Transform returns a transform processor object that executes statements on statementType data. func Transform(statementType, context string, statements []string) Component { return Component{ Type: "transform", Config: map[string]any{ "error_mode": "ignore", fmt.Sprintf("%s_statements", statementType): []map[string]any{ { "context": context, "statements": statements, }, }, }, } } // ExtractCountMetric creates a new metric based on the count value of a Histogram metric func ExtractCountMetric(monotonic bool, metricName string) []string { monotonicStr := "false" if monotonic { monotonicStr = "true" } return []string{ fmt.Sprintf(`extract_count_metric(%s) where name == "%s"`, monotonicStr, metricName), } }