tool/clean/clean_log_group/clean_log_group.go (223 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package main import ( "context" "flag" "fmt" "log" "strings" "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" "github.com/aws/amazon-cloudwatch-agent/tool/clean" ) type cloudwatchlogsClient interface { DeleteLogGroup(ctx context.Context, params *cloudwatchlogs.DeleteLogGroupInput, optFns ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.DeleteLogGroupOutput, error) DescribeLogGroups(ctx context.Context, params *cloudwatchlogs.DescribeLogGroupsInput, optFns ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.DescribeLogGroupsOutput, error) DescribeLogStreams(ctx context.Context, params *cloudwatchlogs.DescribeLogStreamsInput, optFns ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.DescribeLogStreamsOutput, error) } const ( LogGroupProcessChanSize = 500 ) // Config holds the application configuration type Config struct { creationThreshold time.Duration inactiveThreshold time.Duration numWorkers int deleteBatchCap int exceptionList []string dryRun bool } // Global configuration var ( cfg Config ) func init() { // Set default configuration cfg = Config{ creationThreshold: 3 * clean.KeepDurationOneDay, inactiveThreshold: 1 * clean.KeepDurationOneDay, numWorkers: 15, exceptionList: []string{"lambda"}, dryRun: true, } } func main() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) defer cancel() // Parse command line flags flag.BoolVar(&cfg.dryRun, "dry-run", false, "Enable dry-run mode (no actual deletion)") flag.Parse() // Load AWS configuration awsCfg, err := loadAWSConfig(ctx) if err != nil { log.Fatalf("Error loading AWS config: %v", err) } // Create CloudWatch Logs client client := cloudwatchlogs.NewFromConfig(awsCfg) // Compute cutoff times cutoffTimes := calculateCutoffTimes() log.Printf("🔍 Searching for CloudWatch Log Groups older than %d days AND inactive for %d days in %s region\n", cfg.creationThreshold, cfg.inactiveThreshold, awsCfg.Region) // Delete old log groups deletedGroups := deleteOldLogGroups(ctx, client, cutoffTimes) log.Printf("Total log groups deleted: %d", len(deletedGroups)) } type cutoffTimes struct { creation int64 inactive int64 } func calculateCutoffTimes() cutoffTimes { return cutoffTimes{ creation: time.Now().Add(cfg.creationThreshold).UnixMilli(), inactive: time.Now().Add(cfg.inactiveThreshold).UnixMilli(), } } func loadAWSConfig(ctx context.Context) (aws.Config, error) { cfg, err := config.LoadDefaultConfig(ctx) if err != nil { return aws.Config{}, fmt.Errorf("loading AWS config: %w", err) } cfg.RetryMode = aws.RetryModeAdaptive return cfg, nil } func deleteOldLogGroups(ctx context.Context, client cloudwatchlogsClient, times cutoffTimes) []string { var ( wg sync.WaitGroup deletedLogGroup []string foundLogGroupChan = make(chan *types.LogGroup, LogGroupProcessChanSize) deletedLogGroupNameChan = make(chan string, LogGroupProcessChanSize) handlerWg sync.WaitGroup ) // Start worker pool log.Printf("👷 Creating %d workers\n", cfg.numWorkers) for i := 0; i < cfg.numWorkers; i++ { wg.Add(1) w := worker{ id: i, wg: &wg, incomingLogGroupChan: foundLogGroupChan, deletedLogGroupChan: deletedLogGroupNameChan, times: times, } go w.processLogGroup(ctx, client) } // Start handler with its own WaitGroup handlerWg.Add(1) go func() { handleDeletedLogGroups(&deletedLogGroup, deletedLogGroupNameChan) handlerWg.Done() }() // Process log groups in batches if err := fetchAndProcessLogGroups(ctx, client, foundLogGroupChan); err != nil { log.Printf("Error processing log groups: %v", err) } close(foundLogGroupChan) wg.Wait() close(deletedLogGroupNameChan) handlerWg.Wait() return deletedLogGroup } func handleDeletedLogGroups(deletedLogGroups *[]string, deletedLogGroupNameChan chan string) { for logGroupName := range deletedLogGroupNameChan { *deletedLogGroups = append(*deletedLogGroups, logGroupName) log.Printf("🔍 Processed %d log groups so far\n", len(*deletedLogGroups)) } } type worker struct { id int wg *sync.WaitGroup incomingLogGroupChan <-chan *types.LogGroup deletedLogGroupChan chan<- string times cutoffTimes } func (w *worker) processLogGroup(ctx context.Context, client cloudwatchlogsClient) { defer w.wg.Done() for logGroup := range w.incomingLogGroupChan { if err := w.handleLogGroup(ctx, client, logGroup); err != nil { log.Printf("Worker %d: Error processing log group: %v", w.id, err) } } } func (w *worker) handleLogGroup(ctx context.Context, client cloudwatchlogsClient, logGroup *types.LogGroup) error { if logGroup.CreationTime == nil { return fmt.Errorf("log group has no creation time: %v", logGroup) } logGroupName := *logGroup.LogGroupName creationTime := *logGroup.CreationTime if creationTime >= w.times.creation { return nil } lastLogTime := getLastLogEventTime(ctx, client, logGroupName) if lastLogTime == 0 { return nil } if lastLogTime < w.times.inactive { log.Printf("🚨 Worker: %d| Old & Inactive Log Group: %s (Created: %v, Last Event: %v)\n", w.id, logGroupName, time.Unix(creationTime, 0), time.Unix(lastLogTime, 0)) w.deletedLogGroupChan <- logGroupName if cfg.dryRun { log.Printf("🛑 Dry-Run: Would delete log group: %s", logGroupName) return nil } return deleteLogGroup(ctx, client, logGroupName) } return nil } func deleteLogGroup(ctx context.Context, client cloudwatchlogsClient, logGroupName string) error { _, err := client.DeleteLogGroup(ctx, &cloudwatchlogs.DeleteLogGroupInput{ LogGroupName: aws.String(logGroupName), }) if err != nil { return fmt.Errorf("deleting log group %s: %w", logGroupName, err) } log.Printf("✅ Deleted log group: %s", logGroupName) return nil } func fetchAndProcessLogGroups(ctx context.Context, client cloudwatchlogsClient, logGroupChan chan<- *types.LogGroup) error { var nextToken *string describeCount := 0 for { output, err := client.DescribeLogGroups(ctx, &cloudwatchlogs.DescribeLogGroupsInput{ NextToken: nextToken, }) if err != nil { return fmt.Errorf("describing log groups: %w", err) } log.Printf("🔍 Described %d times | Found %d log groups\n", describeCount, len(output.LogGroups)) for _, logGroup := range output.LogGroups { if isLogGroupException(*logGroup.LogGroupName) { log.Printf("⏭️ Skipping Log Group: %s (in exception list)\n", *logGroup.LogGroupName) continue } logGroupChan <- &logGroup } if output.NextToken == nil { break } nextToken = output.NextToken describeCount++ } return nil } func getLastLogEventTime(ctx context.Context, client cloudwatchlogsClient, logGroupName string) int64 { var latestTimestamp int64 latestTimestamp = 0 output, err := client.DescribeLogStreams(ctx, &cloudwatchlogs.DescribeLogStreamsInput{ LogGroupName: aws.String(logGroupName), OrderBy: types.OrderByLastEventTime, Descending: aws.Bool(true), }) if err != nil { log.Printf("⚠️ Warning: Failed to retrieve log streams for %s: %v\n", logGroupName, err) return 0 } if len(output.LogStreams) == 0 { return 0 } stream := output.LogStreams[0] if stream.LastEventTimestamp != nil && *stream.LastEventTimestamp > latestTimestamp { latestTimestamp = *stream.LastEventTimestamp } return latestTimestamp } func isLogGroupException(logGroupName string) bool { for _, exception := range cfg.exceptionList { if strings.Contains(logGroupName, exception) { return true } } return false }