internal/vulnerability/worker.go (130 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package vulnerability import ( "context" "fmt" "sync" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/cloudbeat/internal/config" "github.com/elastic/cloudbeat/internal/dataprovider" "github.com/elastic/cloudbeat/internal/infra/clog" "github.com/elastic/cloudbeat/internal/resources/providers/awslib" "github.com/elastic/cloudbeat/internal/resources/providers/awslib/ec2" ) type VulnerabilityWorker struct { log *clog.Logger fetcher VulnerabilityFetcher replicator VulnerabilityReplicator verifier VulnerabilityVerifier evaluator VulnerabilityScanner runner VulnerabilityRunner eventsCreator EventsCreator manager *SnapshotManager } func NewVulnerabilityWorker(ctx context.Context, log *clog.Logger, c *config.Config, bdp dataprovider.CommonDataProvider, cdp dataprovider.ElasticCommonDataProvider) (*VulnerabilityWorker, error) { log.Debug("VulnerabilityWorker: New") awsConfig, err := awslib.InitializeAWSConfig(c.CloudConfig.Aws.Cred) if err != nil { return nil, fmt.Errorf("VulnerabilityWorker: failed to initialize AWS credentials: %w", err) } provider := ec2.NewCurrentRegionEC2Provider(ctx, log, "", *awsConfig, &awslib.MultiRegionClientFactory[ec2.Client]{}) manager := NewSnapshotManager(log, provider) fetcher := NewVulnerabilityFetcher(log, provider) replicator := NewVulnerabilityReplicator(log, manager) verifier := NewVulnerabilityVerifier(log, provider) runner, err := NewVulnerabilityRunner(ctx, log) if err != nil { return nil, fmt.Errorf("VulnerabilityWorker: could not get init NewVulnerabilityRunner: %w", err) } // TODO: Replace sequence with more generic approach evaluator, err := NewVulnerabilityScanner(log, runner, manager, c, time.Now()) if err != nil { return nil, fmt.Errorf("VulnerabilityWorker: could not get init NewVulnerabilityScanner: %w", err) } eventsCreator := NewEventsCreator(log, c, bdp, cdp) return &VulnerabilityWorker{ log: log, fetcher: fetcher, replicator: replicator, verifier: verifier, evaluator: evaluator, runner: runner, eventsCreator: eventsCreator, manager: manager, }, nil } func (f *VulnerabilityWorker) Run(ctx context.Context) { f.log.Info("Starting VulnerabilityWorker.work") defer func() { if err := f.runner.Close(ctx); err != nil { f.log.Warnf("error during runner closing %s", err.Error()) } }() if ctx.Err() != nil { f.log.Info("VulnerabilityWorker.work context canceled") return } defer f.manager.Cleanup(ctx) jobs := []struct { name string fn func(ctx context.Context) error }{ { name: "DeleteOldSnapshots", fn: func(ctx context.Context) error { f.manager.DeleteOldSnapshots(ctx) return nil }, }, { name: "FetchInstances", fn: f.fetcher.FetchInstances, }, { name: "SnapshotInstance", fn: func(ctx context.Context) error { f.replicator.SnapshotInstance(ctx, f.fetcher.GetChan()) return nil }, }, { name: "VerifySnapshot", fn: func(ctx context.Context) error { f.verifier.VerifySnapshot(ctx, f.replicator.GetChan()) return nil }, }, { name: "ScanSnapshot", fn: func(ctx context.Context) error { f.evaluator.ScanSnapshot(ctx, f.verifier.GetChan()) return nil }, }, { name: "CreateEvents", fn: func(ctx context.Context) error { f.eventsCreator.CreateEvents(ctx, f.evaluator.GetChan()) return nil }, }, } var wg sync.WaitGroup wg.Add(len(jobs)) for _, job := range jobs { go func() { defer wg.Done() err := job.fn(ctx) if err != nil { f.log.Errorf("VulnerabilityWorker.work job %s failed: %s", job.name, err.Error()) } else { f.log.Infof("VulnerabilityWorker.work job %s finished", job.name) } }() } f.log.Info("VulnerabilityWorker.work waiting on workers") wg.Wait() f.log.Info("VulnerabilityWorker.work finished waiting on workers") } func (f *VulnerabilityWorker) GetChan() chan []beat.Event { return f.eventsCreator.GetChan() }