internal/deployers/eksapi/janitor.go (120 lines of code) (raw):

package eksapi import ( "context" "errors" "fmt" "strings" "sync" "time" "github.com/aws/aws-k8s-tester/internal/awssdk" "github.com/aws/aws-k8s-tester/internal/metrics" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudformation" cloudformationtypes "github.com/aws/aws-sdk-go-v2/service/cloudformation/types" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" "k8s.io/klog/v2" ) func NewJanitor(maxResourceAge time.Duration, emitMetrics bool, workers int, stackStatus string) *janitor { awsConfig := awssdk.NewConfig() var metricRegistry metrics.MetricRegistry if emitMetrics { metricRegistry = metrics.NewCloudWatchRegistry(cloudwatch.NewFromConfig(awsConfig)) } else { metricRegistry = metrics.NewNoopMetricRegistry() } if workers <= 0 { workers = 1 } return &janitor{ maxResourceAge: maxResourceAge, workers: workers, stackStatus: stackStatus, awsConfig: awsConfig, cfnClient: cloudformation.NewFromConfig(awsConfig), metrics: metricRegistry, } } type janitor struct { maxResourceAge time.Duration workers int stackStatus string awsConfig aws.Config cfnClient *cloudformation.Client metrics metrics.MetricRegistry } func (j *janitor) Sweep(ctx context.Context) error { awsConfig := awssdk.NewConfig() cfnClient := cloudformation.NewFromConfig(awsConfig) stacks, err := j.getStacks(ctx, cfnClient) if err != nil { return fmt.Errorf("failed to get stacks: %v", err) } var wg sync.WaitGroup stackQueue := make(chan cloudformationtypes.Stack, len(stacks)) errChan := make(chan error, len(stacks)) for i := 1; i <= j.workers; i++ { wg.Add(1) go j.sweepWorker(&wg, stackQueue, errChan) } for _, stack := range stacks { stackQueue <- stack } close(stackQueue) wg.Wait() close(errChan) var errs []error for err := range errChan { errs = append(errs, err) } return errors.Join(errs...) } func (j *janitor) getStacks(ctx context.Context, cfnClient *cloudformation.Client) ([]cloudformationtypes.Stack, error) { var stacks []cloudformationtypes.Stack stackPaginator := cloudformation.NewDescribeStacksPaginator(cfnClient, &cloudformation.DescribeStacksInput{}) for stackPaginator.HasMorePages() { page, err := stackPaginator.NextPage(ctx) if err != nil { return nil, err } stacks = append(stacks, page.Stacks...) } return stacks, nil } func (j *janitor) sweepWorker(wg *sync.WaitGroup, stackQueue <-chan cloudformationtypes.Stack, errChan chan<- error) { defer wg.Done() for stack := range stackQueue { resourceID := *stack.StackName if !strings.HasPrefix(resourceID, ResourcePrefix) { continue } if stack.StackStatus == "DELETE_COMPLETE" { continue } if j.stackStatus != "" && j.stackStatus != string(stack.StackStatus) { klog.Infof("skipping resources (status: %v): %s", stack.StackStatus, resourceID) continue } resourceAge := time.Since(*stack.CreationTime) if resourceAge < j.maxResourceAge { klog.Infof("skipping resources (%v old): %s", resourceAge, resourceID) continue } clients := j.awsClientsForStack(stack) infraManager := NewInfrastructureManager(clients, resourceID, j.metrics) clusterManager := NewClusterManager(clients, resourceID) nodeManager := NewNodeManager(clients, resourceID) klog.Infof("deleting resources (%v old): %s", resourceAge, resourceID) if err := deleteResources(infraManager, clusterManager, nodeManager, nil /* k8sClient */, nil /* deployerOptions */); err != nil { errChan <- fmt.Errorf("failed to delete resources: %s: %v", resourceID, err) } } } func (j *janitor) awsClientsForStack(stack cloudformationtypes.Stack) *awsClients { var eksEndpointURL string for _, tag := range stack.Tags { if *tag.Key == eksEndpointURLTag { eksEndpointURL = *tag.Value } } return newAWSClients(j.awsConfig, eksEndpointURL) }