extension/encoding/awscloudwatchmetricstreamsencodingextension/json_unmarshaler.go (194 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package awscloudwatchmetricstreamsencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awscloudwatchmetricstreamsencodingextension" import ( "bufio" "bytes" "errors" "fmt" "strings" "time" gojson "github.com/goccy/go-json" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" conventions "go.opentelemetry.io/collector/semconv/v1.27.0" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awscloudwatchmetricstreamsencodingextension/internal/metadata" ) const ( attributeAWSCloudWatchMetricStreamName = "aws.cloudwatch.metric_stream_name" dimensionInstanceID = "InstanceId" namespaceDelimiter = "/" ) var ( errNoMetricName = errors.New("cloudwatch metric is missing metric name field") errNoMetricNamespace = errors.New("cloudwatch metric is missing namespace field") errNoMetricUnit = errors.New("cloudwatch metric is missing unit field") errNoMetricValue = errors.New("cloudwatch metric is missing value") ) type formatJSONUnmarshaler struct { buildInfo component.BuildInfo } var _ pmetric.Unmarshaler = (*formatJSONUnmarshaler)(nil) // The cloudwatchMetric is the format for the CloudWatch metric stream records. // // More details can be found at: // https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-json.html type cloudwatchMetric struct { // MetricStreamName is the name of the CloudWatch metric stream. MetricStreamName string `json:"metric_stream_name"` // AccountID is the AWS account ID associated with the metric. AccountID string `json:"account_id"` // Region is the AWS region for the metric. Region string `json:"region"` // Namespace is the CloudWatch namespace the metric is in. Namespace string `json:"namespace"` // MetricName is the name of the metric. MetricName string `json:"metric_name"` // Dimensions is a map of name/value pairs that help to // differentiate a metric. Dimensions map[string]string `json:"dimensions"` // Timestamp is the milliseconds since epoch for // the metric. Timestamp int64 `json:"timestamp"` // Value is the cloudwatchMetricValue, which has the min, max, // sum, and count. Value cloudwatchMetricValue `json:"value"` // Unit is the unit for the metric. // // More details can be found at: // https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html Unit string `json:"unit"` } // The cloudwatchMetricValue is the actual values of the CloudWatch metric. type cloudwatchMetricValue struct { isSet bool // Max is the highest value observed. Max float64 `json:"max"` // Min is the lowest value observed. Min float64 `json:"min"` // Sum is the sum of data points collected. Sum float64 `json:"sum"` // Count is the number of data points. Count float64 `json:"count"` } // validateMetric validates that the cloudwatch metric has been unmarshalled correctly func validateMetric(metric cloudwatchMetric) error { if metric.MetricName == "" { return errNoMetricName } if metric.Namespace == "" { return errNoMetricNamespace } if metric.Unit == "" { return errNoMetricUnit } if !metric.Value.isSet { return errNoMetricValue } return nil } // UnmarshalJSON unmarshalls the data to a cloudwatchMetricValue, // and sets isSet to true upon a successful execution func (v *cloudwatchMetricValue) UnmarshalJSON(data []byte) error { type valueType cloudwatchMetricValue if err := gojson.Unmarshal(data, (*valueType)(v)); err != nil { return err } v.isSet = true return nil } // resourceKey stores the metric attributes // that make a cloudwatchMetric unique to // a resource type resourceKey struct { metricStreamName string namespace string accountID string region string } // metricKey stores the metric attributes // that make a metric unique within // a resource type metricKey struct { name string unit string } func (c *formatJSONUnmarshaler) UnmarshalMetrics(record []byte) (pmetric.Metrics, error) { var errs []error byResource := make(map[resourceKey]map[metricKey]pmetric.Metric) // Multiple metrics in each record separated by newline character scanner := bufio.NewScanner(bytes.NewReader(record)) for datumIndex := 0; scanner.Scan(); datumIndex++ { var cwMetric cloudwatchMetric if err := gojson.Unmarshal(scanner.Bytes(), &cwMetric); err != nil { errs = append(errs, fmt.Errorf("error unmarshaling datum at index %d: %w", datumIndex, err)) byResource = map[resourceKey]map[metricKey]pmetric.Metric{} // free the memory continue } if err := validateMetric(cwMetric); err != nil { errs = append(errs, fmt.Errorf("invalid cloudwatch metric at index %d: %w", datumIndex, err)) byResource = map[resourceKey]map[metricKey]pmetric.Metric{} // free the memory continue } if len(errs) == 0 { // only add the metric if there are // no errors so far c.addMetricToResource(byResource, cwMetric) } } if err := scanner.Err(); err != nil { errs = append(errs, fmt.Errorf("error scanning for newline-delimited JSON: %w", err)) } if len(errs) > 0 { return pmetric.Metrics{}, errors.Join(errs...) } return c.createMetrics(byResource), nil } // addMetricToResource adds a new cloudwatchMetric to the // resource it belongs to according to resourceKey. It then // sets the data point for the cloudwatchMetric. func (c *formatJSONUnmarshaler) addMetricToResource( byResource map[resourceKey]map[metricKey]pmetric.Metric, cwMetric cloudwatchMetric, ) { rKey := resourceKey{ metricStreamName: cwMetric.MetricStreamName, namespace: cwMetric.Namespace, accountID: cwMetric.AccountID, region: cwMetric.Region, } metrics, ok := byResource[rKey] if !ok { metrics = make(map[metricKey]pmetric.Metric) byResource[rKey] = metrics } mKey := metricKey{ name: cwMetric.MetricName, unit: cwMetric.Unit, } metric, ok := metrics[mKey] if !ok { metric = pmetric.NewMetric() metric.SetName(mKey.name) metric.SetUnit(mKey.unit) metric.SetEmptySummary() metrics[mKey] = metric } dp := metric.Summary().DataPoints().AppendEmpty() dp.SetTimestamp(pcommon.NewTimestampFromTime(time.UnixMilli(cwMetric.Timestamp))) setDataPointAttributes(cwMetric, dp) dp.SetCount(uint64(cwMetric.Value.Count)) dp.SetSum(cwMetric.Value.Sum) minQ := dp.QuantileValues().AppendEmpty() minQ.SetQuantile(0) minQ.SetValue(cwMetric.Value.Min) maxQ := dp.QuantileValues().AppendEmpty() maxQ.SetQuantile(1) maxQ.SetValue(cwMetric.Value.Max) } // createMetrics creates pmetric.Metrics based on // on the extracted metrics of each resource func (c *formatJSONUnmarshaler) createMetrics( byResource map[resourceKey]map[metricKey]pmetric.Metric, ) pmetric.Metrics { metrics := pmetric.NewMetrics() for rKey, metricsMap := range byResource { rm := metrics.ResourceMetrics().AppendEmpty() setResourceAttributes(rKey, rm.Resource()) scopeMetrics := rm.ScopeMetrics().AppendEmpty() scopeMetrics.Scope().SetName(metadata.ScopeName) scopeMetrics.Scope().SetVersion(c.buildInfo.Version) for _, metric := range metricsMap { metric.MoveTo(scopeMetrics.Metrics().AppendEmpty()) } } return metrics } // setResourceAttributes sets attributes on a pcommon.Resource from a cloudwatchMetric. func setResourceAttributes(rKey resourceKey, resource pcommon.Resource) { attributes := resource.Attributes() attributes.PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS) attributes.PutStr(conventions.AttributeCloudAccountID, rKey.accountID) attributes.PutStr(conventions.AttributeCloudRegion, rKey.region) serviceNamespace, serviceName := toServiceAttributes(rKey.namespace) if serviceNamespace != "" { attributes.PutStr(conventions.AttributeServiceNamespace, serviceNamespace) } attributes.PutStr(conventions.AttributeServiceName, serviceName) attributes.PutStr(attributeAWSCloudWatchMetricStreamName, rKey.metricStreamName) } // toServiceAttributes splits the CloudWatch namespace into service namespace/name // if prepended by AWS/. Otherwise, it returns the CloudWatch namespace as the // service name with an empty service namespace func toServiceAttributes(namespace string) (serviceNamespace, serviceName string) { index := strings.Index(namespace, namespaceDelimiter) if index != -1 && strings.EqualFold(namespace[:index], conventions.AttributeCloudProviderAWS) { return namespace[:index], namespace[index+1:] } return "", namespace } // setDataPointAttributes sets attributes on a metric data point from a cloudwatchMetric. func setDataPointAttributes(metric cloudwatchMetric, dp pmetric.SummaryDataPoint) { attrs := dp.Attributes() for k, v := range metric.Dimensions { switch k { case dimensionInstanceID: attrs.PutStr(conventions.AttributeServiceInstanceID, v) default: attrs.PutStr(k, v) } } }