plugins/outputs/cloudwatchlogs/internal/pusher/target.go (211 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package pusher
import (
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/influxdata/telegraf"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
)
const (
retentionChannelSize = 100
// max wait time with backoff and jittering:
// 0 + 2.4 + 4.8 + 9.6 + 10 ~= 26.8 sec
baseRetryDelay = 1 * time.Second
maxRetryDelayTarget = 10 * time.Second
numBackoffRetries = 5
)
type Target struct {
Group, Stream, Class string
Retention int
}
type TargetManager interface {
InitTarget(target Target) error
PutRetentionPolicy(target Target)
}
type targetManager struct {
logger telegraf.Logger
service cloudWatchLogsService
// cache of initialized targets
cache map[Target]struct{}
mu sync.Mutex
dlg chan Target
prp chan Target
}
func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService) TargetManager {
tm := &targetManager{
logger: logger,
service: service,
cache: make(map[Target]struct{}),
dlg: make(chan Target, retentionChannelSize),
prp: make(chan Target, retentionChannelSize),
}
go tm.processDescribeLogGroup()
go tm.processPutRetentionPolicy()
return tm
}
// InitTarget initializes a Target if it hasn't been initialized before.
func (m *targetManager) InitTarget(target Target) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.cache[target]; !ok {
newGroup, err := m.createLogGroupAndStream(target)
if err != nil {
return err
}
if target.Retention > 0 {
if newGroup {
m.logger.Debugf("sending new log group %v to prp channel", target.Group)
m.prp <- target
} else {
m.logger.Debugf("sending existing log group %v to dlg channel", target.Group)
m.dlg <- target
}
}
m.cache[target] = struct{}{}
}
return nil
}
func (m *targetManager) PutRetentionPolicy(target Target) {
// new pusher will call this so start with dlg
if target.Retention > 0 {
m.logger.Debugf("sending log group %v to dlg channel by pusher", target.Group)
m.dlg <- target
}
}
func (m *targetManager) createLogGroupAndStream(t Target) (bool, error) {
err := m.createLogStream(t)
if m.isLogStreamCreated(err, t.Stream) {
return false, nil
}
m.logger.Debugf("creating stream %v fail due to: %v", t.Stream, err)
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException {
err = m.createLogGroup(t)
// attempt to create stream again if group created successfully.
if m.isLogGroupCreated(err, t.Group) {
m.logger.Debugf("retrying log stream %v", t.Stream)
err = m.createLogStream(t)
if m.isLogStreamCreated(err, t.Stream) {
return true, nil
}
} else {
m.logger.Debugf("creating group %v fail due to: %v", t.Group, err)
}
}
return false, err
}
func (m *targetManager) isLogGroupCreated(err error, group string) bool {
return m.isResourceCreated(err, fmt.Sprintf("log group %v", group))
}
func (m *targetManager) isLogStreamCreated(err error, stream string) bool {
return m.isResourceCreated(err, fmt.Sprintf("log stream %v", stream))
}
func (m *targetManager) isResourceCreated(err error, resourceName string) bool {
if err == nil {
return true
}
// if the resource already exist
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException {
m.logger.Debugf("%s was already created. %v\n", resourceName, err)
return true
}
return false
}
func (m *targetManager) createLogGroup(t Target) error {
var input *cloudwatchlogs.CreateLogGroupInput
if t.Class != "" {
input = &cloudwatchlogs.CreateLogGroupInput{
LogGroupName: &t.Group,
LogGroupClass: &t.Class,
}
} else {
input = &cloudwatchlogs.CreateLogGroupInput{
LogGroupName: &t.Group,
}
}
_, err := m.service.CreateLogGroup(input)
if err == nil {
m.logger.Debugf("successfully created log group %v", t.Group)
return nil
}
return err
}
func (m *targetManager) createLogStream(t Target) error {
_, err := m.service.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: &t.Group,
LogStreamName: &t.Stream,
})
if err == nil {
m.logger.Debugf("successfully created log stream %v", t.Stream)
return nil
}
return err
}
func (m *targetManager) processDescribeLogGroup() {
for target := range m.dlg {
for attempt := 0; attempt < numBackoffRetries; attempt++ {
currentRetention, err := m.getRetention(target)
if err != nil {
m.logger.Errorf("failed to describe log group retention for target %v: %v", target, err)
time.Sleep(m.calculateBackoff(attempt))
continue
}
if currentRetention != target.Retention && target.Retention > 0 {
m.logger.Debugf("queueing log group %v to update retention policy", target.Group)
m.prp <- target
}
break // no change in retention
}
}
}
func (m *targetManager) getRetention(target Target) (int, error) {
input := &cloudwatchlogs.DescribeLogGroupsInput{
LogGroupNamePrefix: aws.String(target.Group),
}
output, err := m.service.DescribeLogGroups(input)
if err != nil {
return 0, fmt.Errorf("describe log groups failed: %w", err)
}
for _, group := range output.LogGroups {
if *group.LogGroupName == target.Group {
if group.RetentionInDays == nil {
return 0, nil
}
return int(*group.RetentionInDays), nil
}
}
return 0, fmt.Errorf("log group %v not found", target.Group)
}
func (m *targetManager) processPutRetentionPolicy() {
for target := range m.prp {
var updated bool
for attempt := 0; attempt < numBackoffRetries; attempt++ {
err := m.updateRetentionPolicy(target)
if err == nil {
updated = true
break
}
m.logger.Debugf("retrying to update retention policy for target (%v) %v: %v", attempt, target, err)
time.Sleep(m.calculateBackoff(attempt))
}
if !updated {
m.logger.Errorf("failed to update retention policy for target %v after %d attempts", target, numBackoffRetries)
}
}
}
func (m *targetManager) updateRetentionPolicy(target Target) error {
input := &cloudwatchlogs.PutRetentionPolicyInput{
LogGroupName: aws.String(target.Group),
RetentionInDays: aws.Int64(int64(target.Retention)),
}
_, err := m.service.PutRetentionPolicy(input)
if err != nil {
return fmt.Errorf("put retention policy failed: %w", err)
}
m.logger.Debugf("successfully updated retention policy for log group %v", target.Group)
return nil
}
func (m *targetManager) calculateBackoff(retryCount int) time.Duration {
delay := baseRetryDelay
if retryCount < numBackoffRetries {
delay = baseRetryDelay * time.Duration(1<<int64(retryCount))
}
if delay > maxRetryDelayTarget {
delay = maxRetryDelayTarget
}
return withJitter(delay)
}