plugins/outputs/cloudwatchlogs/cloudwatchlogs.go (352 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package cloudwatchlogs import ( "encoding/json" "fmt" "regexp" "strings" "sync" "time" "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" "github.com/aws/aws-sdk-go/aws" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" "go.uber.org/zap" configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws" "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/useragent" "github.com/aws/amazon-cloudwatch-agent/handlers" "github.com/aws/amazon-cloudwatch-agent/internal" "github.com/aws/amazon-cloudwatch-agent/internal/retryer" "github.com/aws/amazon-cloudwatch-agent/logs" "github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" "github.com/aws/amazon-cloudwatch-agent/tool/util" ) const ( LogGroupNameTag = "log_group_name" LogStreamNameTag = "log_stream_name" LogGroupClassTag = "log_group_class" LogTimestampField = "log_timestamp" LogEntryField = "value" defaultFlushTimeout = 5 * time.Second maxRetryTimeout = 14*24*time.Hour + 10*time.Minute metricRetryTimeout = 2 * time.Minute attributesInFields = "attributesInFields" ) var ( containerInsightsRegexp = regexp.MustCompile("^/aws/.*containerinsights/.*/(performance|prometheus)$") ) type CloudWatchLogs struct { Region string `toml:"region"` RegionType string `toml:"region_type"` Mode string `toml:"mode"` EndpointOverride string `toml:"endpoint_override"` AccessKey string `toml:"access_key"` SecretKey string `toml:"secret_key"` RoleARN string `toml:"role_arn"` Profile string `toml:"profile"` Filename string `toml:"shared_credential_file"` Token string `toml:"token"` //log group and stream names LogStreamName string `toml:"log_stream_name"` LogGroupName string `toml:"log_group_name"` // Retention for log group RetentionInDays int `toml:"retention_in_days"` Concurrency int `toml:"concurrency"` ForceFlushInterval internal.Duration `toml:"force_flush_interval"` // unit is second Log telegraf.Logger `toml:"-"` pusherStopChan chan struct{} pusherWaitGroup sync.WaitGroup cwDests map[pusher.Target]*cwDest workerPool pusher.WorkerPool targetManager pusher.TargetManager once sync.Once middleware awsmiddleware.Middleware } func (c *CloudWatchLogs) Connect() error { return nil } func (c *CloudWatchLogs) Close() error { close(c.pusherStopChan) c.pusherWaitGroup.Wait() for _, d := range c.cwDests { d.Stop() } if c.workerPool != nil { c.workerPool.Stop() } return nil } func (c *CloudWatchLogs) Write(metrics []telegraf.Metric) error { for _, m := range metrics { c.writeMetricAsStructuredLog(m) } return nil } func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGroupClass string, logSrc logs.LogSrc) logs.LogDest { if group == "" { group = c.LogGroupName } if stream == "" { stream = c.LogStreamName } if retention <= 0 { retention = -1 } t := pusher.Target{ Group: group, Stream: stream, Retention: retention, Class: logGroupClass, } return c.getDest(t, logSrc) } func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest { if cwd, ok := c.cwDests[t]; ok { return cwd } logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log) client := c.createClient(logThrottleRetryer) agent.UsageFlags().SetValue(agent.FlagRegionType, c.RegionType) agent.UsageFlags().SetValue(agent.FlagMode, c.Mode) if containerInsightsRegexp.MatchString(t.Group) { useragent.Get().SetContainerInsightsFlag() } c.once.Do(func() { if c.Concurrency > 0 { c.workerPool = pusher.NewWorkerPool(c.Concurrency) } c.targetManager = pusher.NewTargetManager(c.Log, client) }) p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, c.pusherStopChan, &c.pusherWaitGroup) cwd := &cwDest{pusher: p, retryer: logThrottleRetryer} c.cwDests[t] = cwd return cwd } func (c *CloudWatchLogs) createClient(retryer aws.RequestRetryer) *cloudwatchlogs.CloudWatchLogs { credentialConfig := &configaws.CredentialConfig{ Region: c.Region, AccessKey: c.AccessKey, SecretKey: c.SecretKey, RoleARN: c.RoleARN, Profile: c.Profile, Filename: c.Filename, Token: c.Token, } client := cloudwatchlogs.New( credentialConfig.Credentials(), &aws.Config{ Endpoint: aws.String(c.EndpointOverride), Retryer: retryer, LogLevel: configaws.SDKLogLevel(), Logger: configaws.SDKLogger{}, }, ) client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"})) if c.middleware != nil { if err := awsmiddleware.NewConfigurer(c.middleware.Handlers()).Configure(awsmiddleware.SDKv1(&client.Handlers)); err != nil { c.Log.Errorf("Unable to configure middleware on cloudwatch logs client: %v", err) } else { c.Log.Debug("Configured middleware on AWS client") } } return client } func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) { t, err := c.getTargetFromMetric(m) if err != nil { c.Log.Errorf("Failed to find target: %v", err) } cwd := c.getDest(t, nil) if cwd == nil { c.Log.Warnf("unable to find log destination, group: %v, stream: %v", t.Group, t.Stream) return } cwd.switchToEMF() cwd.pusher.Sender.SetRetryDuration(metricRetryTimeout) e := c.getLogEventFromMetric(m) if e == nil { return } cwd.AddEvent(e) } func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (pusher.Target, error) { tags := m.Tags() logGroup, ok := tags[LogGroupNameTag] if !ok { return pusher.Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name()) } else { m.RemoveTag(LogGroupNameTag) } logStream, ok := tags[LogStreamNameTag] if ok { m.RemoveTag(LogStreamNameTag) } else if logStream == "" { logStream = c.LogStreamName } return pusher.Target{Group: logGroup, Stream: logStream, Class: util.StandardLogGroupClass, Retention: -1}, nil } func (c *CloudWatchLogs) getLogEventFromMetric(metric telegraf.Metric) *structuredLogEvent { var message string if metric.HasField(LogEntryField) { var ok bool if message, ok = metric.Fields()[LogEntryField].(string); !ok { c.Log.Warnf("The log entry value field is not string type: %v", metric.Fields()) return nil } } else { content := map[string]interface{}{} tags := metric.Tags() // build all the attributesInFields if val, ok := tags[attributesInFields]; ok { attributes := strings.Split(val, ",") mFields := metric.Fields() for _, attr := range attributes { if fieldVal, ok := mFields[attr]; ok { content[attr] = fieldVal metric.RemoveField(attr) } } metric.RemoveTag(attributesInFields) delete(tags, attributesInFields) } // build remaining attributes for k := range tags { content[k] = tags[k] } for k, v := range metric.Fields() { var value interface{} switch t := v.(type) { case int: value = float64(t) case int32: value = float64(t) case int64: value = float64(t) case uint: value = float64(t) case uint32: value = float64(t) case uint64: value = float64(t) case float64: value = t case bool: value = t case string: value = t case time.Time: value = float64(t.Unix()) default: c.Log.Errorf("Detected unexpected fields (%s,%v) when encoding structured log event, value type %T is not supported", k, v, v) return nil } content[k] = value } jsonMap, err := json.Marshal(content) if err != nil { c.Log.Errorf("Unalbe to marshal structured log content: %v", err) } message = string(jsonMap) } return &structuredLogEvent{ msg: message, t: metric.Time(), } } type structuredLogEvent struct { msg string t time.Time } func (e *structuredLogEvent) Message() string { return e.msg } func (e *structuredLogEvent) Time() time.Time { return e.t } func (e *structuredLogEvent) Done() {} type cwDest struct { pusher *pusher.Pusher sync.Mutex isEMF bool stopped bool retryer *retryer.LogThrottleRetryer } func (cd *cwDest) Publish(events []logs.LogEvent) error { for _, e := range events { if !cd.isEMF { msg := e.Message() if strings.HasPrefix(msg, "{") && strings.HasSuffix(msg, "}") && strings.Contains(msg, "\"CloudWatchMetrics\"") { cd.switchToEMF() } } cd.AddEvent(e) } if cd.stopped { return logs.ErrOutputStopped } return nil } func (cd *cwDest) Stop() { cd.retryer.Stop() cd.stopped = true } func (cd *cwDest) AddEvent(e logs.LogEvent) { // Drop events for metric path logs when queue is full if cd.isEMF { cd.pusher.AddEventNonBlocking(e) } else { cd.pusher.AddEvent(e) } } func (cd *cwDest) switchToEMF() { cd.Lock() defer cd.Unlock() if !cd.isEMF { cd.isEMF = true cwl, ok := cd.pusher.Service.(*cloudwatchlogs.CloudWatchLogs) if ok { cwl.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("x-amzn-logs-format", "json/emf")) } } } // Description returns a one-sentence description on the Output func (c *CloudWatchLogs) Description() string { return "Configuration for AWS CloudWatchLogs output." } var sampleConfig = ` ## Amazon REGION region = "us-east-1" ## Amazon Credentials ## Credentials are loaded in the following order ## 1) Assumed credentials via STS if role_arn is specified ## 2) explicit credentials from 'access_key' and 'secret_key' ## 3) shared profile from 'profile' ## 4) environment variables ## 5) shared credentials file ## 6) EC2 Instance Profile #access_key = "" #secret_key = "" #token = "" #role_arn = "" #profile = "" #shared_credential_file = "" # The log stream name. log_stream_name = "<log_stream_name>" ` // SampleConfig returns the default configuration of the Output func (c *CloudWatchLogs) SampleConfig() string { return sampleConfig } func init() { outputs.Add("cloudwatchlogs", func() telegraf.Output { return &CloudWatchLogs{ ForceFlushInterval: internal.Duration{Duration: defaultFlushTimeout}, pusherStopChan: make(chan struct{}), cwDests: make(map[pusher.Target]*cwDest), middleware: agenthealth.NewAgentHealth( zap.NewNop(), &agenthealth.Config{ IsUsageDataEnabled: envconfig.IsUsageDataEnabled(), Stats: &agent.StatsConfig{Operations: []string{"PutLogEvents"}}, IsStatusCodeEnabled: true, }, ), } }) }