exporter/awsemfexporter/emf_exporter.go (165 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/google/uuid"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
)
const (
// OutputDestination Options
outputDestinationCloudWatch = "cloudwatch"
outputDestinationStdout = "stdout"
// AppSignals EMF config
appSignalsMetricNamespace = "ApplicationSignals"
appSignalsLogGroupNamePrefix = "/aws/application-signals/"
)
type emfExporter struct {
pusherMap map[cwlogs.StreamKey]cwlogs.Pusher
svcStructuredLog *cwlogs.Client
config *Config
metricTranslator metricTranslator
pusherMapLock sync.Mutex
retryCnt int
collectorID string
}
// newEmfExporter creates a new exporter using exporterhelper
func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error) {
if config == nil {
return nil, errors.New("emf exporter config is nil")
}
config.logger = set.Logger
// create AWS session
awsConfig, session, err := awsutil.GetAWSConfigSession(set.Logger, &awsutil.Conn{}, &config.AWSSessionSettings)
if err != nil {
return nil, err
}
var userAgentExtras []string
if config.isAppSignalsEnabled() {
userAgentExtras = append(userAgentExtras, "AppSignals")
}
// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(set.Logger,
awsConfig,
set.BuildInfo,
config.LogGroupName,
config.LogRetention,
config.Tags,
session,
metadata.Type.String(),
cwlogs.WithUserAgentExtras(userAgentExtras...),
)
collectorIdentifier, err := uuid.NewRandom()
if err != nil {
return nil, err
}
emfExporter := &emfExporter{
svcStructuredLog: svcStructuredLog,
config: config,
metricTranslator: newMetricTranslator(*config),
retryCnt: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{},
}
config.logger.Warn("the default value for DimensionRollupOption will be changing to NoDimensionRollup" +
"in a future release. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23997 for more" +
"information")
return emfExporter, nil
}
func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) error {
rms := md.ResourceMetrics()
labels := map[string]string{}
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
am := rm.Resource().Attributes()
if am.Len() > 0 {
for k, v := range am.All() {
labels[k] = v.Str()
}
}
}
emf.config.logger.Debug("Start processing resource metrics", zap.Any("labels", labels))
groupedMetrics := make(map[any]*groupedMetric)
defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID)
outputDestination := emf.config.OutputDestination
for i := 0; i < rms.Len(); i++ {
err := emf.metricTranslator.translateOTelToGroupedMetric(rms.At(i), groupedMetrics, emf.config)
if err != nil {
return err
}
}
for _, groupedMetric := range groupedMetrics {
putLogEvent, err := translateGroupedMetricToEmf(groupedMetric, emf.config, defaultLogStream)
if err != nil {
return err
}
// Currently we only support two options for "OutputDestination".
if strings.EqualFold(outputDestination, outputDestinationStdout) {
if putLogEvent != nil &&
putLogEvent.InputLogEvent != nil &&
putLogEvent.InputLogEvent.Message != nil {
fmt.Println(*putLogEvent.InputLogEvent.Message)
}
} else if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
emfPusher := emf.getPusher(putLogEvent.StreamKey)
if emfPusher != nil {
returnError := emfPusher.AddLogEntry(putLogEvent)
if returnError != nil {
return wrapErrorIfBadRequest(returnError)
}
}
}
}
if strings.EqualFold(outputDestination, outputDestinationCloudWatch) {
for _, emfPusher := range emf.listPushers() {
returnError := emfPusher.ForceFlush()
if returnError != nil {
// TODO now we only have one logPusher, so it's ok to return after first error occurred
err := wrapErrorIfBadRequest(returnError)
if err != nil {
emf.config.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(err))
}
return err
}
}
}
emf.config.logger.Debug("Finish processing resource metrics", zap.Any("labels", labels))
return nil
}
func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
var ok bool
if _, ok = emf.pusherMap[key]; !ok {
emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
}
return emf.pusherMap[key]
}
func (emf *emfExporter) listPushers() []cwlogs.Pusher {
emf.pusherMapLock.Lock()
defer emf.pusherMapLock.Unlock()
var pushers []cwlogs.Pusher
for _, pusher := range emf.pusherMap {
pushers = append(pushers, pusher)
}
return pushers
}
// shutdown stops the exporter and is invoked during shutdown.
func (emf *emfExporter) shutdown(_ context.Context) error {
for _, emfPusher := range emf.listPushers() {
returnError := emfPusher.ForceFlush()
if returnError != nil {
err := wrapErrorIfBadRequest(returnError)
if err != nil {
emf.config.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next logPusher.", zap.Error(err))
}
}
}
return emf.metricTranslator.Shutdown()
}
func wrapErrorIfBadRequest(err error) error {
var rfErr awserr.RequestFailure
if errors.As(err, &rfErr) && rfErr.StatusCode() < 500 {
return consumererror.NewPermanent(err)
}
return err
}