internal/vulnerability/worker.go (130 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package vulnerability
import (
"context"
"fmt"
"sync"
"time"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/cloudbeat/internal/config"
"github.com/elastic/cloudbeat/internal/dataprovider"
"github.com/elastic/cloudbeat/internal/infra/clog"
"github.com/elastic/cloudbeat/internal/resources/providers/awslib"
"github.com/elastic/cloudbeat/internal/resources/providers/awslib/ec2"
)
type VulnerabilityWorker struct {
log *clog.Logger
fetcher VulnerabilityFetcher
replicator VulnerabilityReplicator
verifier VulnerabilityVerifier
evaluator VulnerabilityScanner
runner VulnerabilityRunner
eventsCreator EventsCreator
manager *SnapshotManager
}
func NewVulnerabilityWorker(ctx context.Context, log *clog.Logger, c *config.Config, bdp dataprovider.CommonDataProvider, cdp dataprovider.ElasticCommonDataProvider) (*VulnerabilityWorker, error) {
log.Debug("VulnerabilityWorker: New")
awsConfig, err := awslib.InitializeAWSConfig(c.CloudConfig.Aws.Cred)
if err != nil {
return nil, fmt.Errorf("VulnerabilityWorker: failed to initialize AWS credentials: %w", err)
}
provider := ec2.NewCurrentRegionEC2Provider(ctx, log, "", *awsConfig, &awslib.MultiRegionClientFactory[ec2.Client]{})
manager := NewSnapshotManager(log, provider)
fetcher := NewVulnerabilityFetcher(log, provider)
replicator := NewVulnerabilityReplicator(log, manager)
verifier := NewVulnerabilityVerifier(log, provider)
runner, err := NewVulnerabilityRunner(ctx, log)
if err != nil {
return nil, fmt.Errorf("VulnerabilityWorker: could not get init NewVulnerabilityRunner: %w", err)
}
// TODO: Replace sequence with more generic approach
evaluator, err := NewVulnerabilityScanner(log, runner, manager, c, time.Now())
if err != nil {
return nil, fmt.Errorf("VulnerabilityWorker: could not get init NewVulnerabilityScanner: %w", err)
}
eventsCreator := NewEventsCreator(log, c, bdp, cdp)
return &VulnerabilityWorker{
log: log,
fetcher: fetcher,
replicator: replicator,
verifier: verifier,
evaluator: evaluator,
runner: runner,
eventsCreator: eventsCreator,
manager: manager,
}, nil
}
func (f *VulnerabilityWorker) Run(ctx context.Context) {
f.log.Info("Starting VulnerabilityWorker.work")
defer func() {
if err := f.runner.Close(ctx); err != nil {
f.log.Warnf("error during runner closing %s", err.Error())
}
}()
if ctx.Err() != nil {
f.log.Info("VulnerabilityWorker.work context canceled")
return
}
defer f.manager.Cleanup(ctx)
jobs := []struct {
name string
fn func(ctx context.Context) error
}{
{
name: "DeleteOldSnapshots",
fn: func(ctx context.Context) error {
f.manager.DeleteOldSnapshots(ctx)
return nil
},
},
{
name: "FetchInstances",
fn: f.fetcher.FetchInstances,
},
{
name: "SnapshotInstance",
fn: func(ctx context.Context) error {
f.replicator.SnapshotInstance(ctx, f.fetcher.GetChan())
return nil
},
},
{
name: "VerifySnapshot",
fn: func(ctx context.Context) error {
f.verifier.VerifySnapshot(ctx, f.replicator.GetChan())
return nil
},
},
{
name: "ScanSnapshot",
fn: func(ctx context.Context) error {
f.evaluator.ScanSnapshot(ctx, f.verifier.GetChan())
return nil
},
},
{
name: "CreateEvents",
fn: func(ctx context.Context) error {
f.eventsCreator.CreateEvents(ctx, f.evaluator.GetChan())
return nil
},
},
}
var wg sync.WaitGroup
wg.Add(len(jobs))
for _, job := range jobs {
go func() {
defer wg.Done()
err := job.fn(ctx)
if err != nil {
f.log.Errorf("VulnerabilityWorker.work job %s failed: %s", job.name, err.Error())
} else {
f.log.Infof("VulnerabilityWorker.work job %s finished", job.name)
}
}()
}
f.log.Info("VulnerabilityWorker.work waiting on workers")
wg.Wait()
f.log.Info("VulnerabilityWorker.work finished waiting on workers")
}
func (f *VulnerabilityWorker) GetChan() chan []beat.Event {
return f.eventsCreator.GetChan()
}