internal/vulnerability/fetcher.go (85 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package vulnerability import ( "context" "sort" "github.com/elastic/cloudbeat/internal/infra/clog" "github.com/elastic/cloudbeat/internal/resources/providers/awslib/ec2" ) type VulnerabilityFetcher struct { log *clog.Logger provider instancesProvider ch chan *ec2.Ec2Instance } type instancesProvider interface { DescribeInstances(ctx context.Context) ([]*ec2.Ec2Instance, error) DescribeVolumes(ctx context.Context, instances []*ec2.Ec2Instance) ([]*ec2.Volume, error) } func NewVulnerabilityFetcher(log *clog.Logger, provider instancesProvider) VulnerabilityFetcher { log.Debug("VulnerabilityFetcher: New") ch := make(chan *ec2.Ec2Instance) return VulnerabilityFetcher{ log: log, ch: ch, provider: provider, } } func (f VulnerabilityFetcher) FetchInstances(ctx context.Context) error { defer close(f.ch) f.log.Info("Starting VulnerabilityFetcher.FetchInstances") ins, err := f.provider.DescribeInstances(ctx) if err != nil { f.log.Errorf("VulnerabilityFetcher.FetchInstances DescribeInstances failed: %v", err) return err } f.log.Infof("VulnerabilityFetcher.FetchInstances found %d results", len(ins)) err = f.attachRootVolumes(ctx, ins) if err != nil { f.log.Errorf("VulnerabilityFetcher.FetchInstances attachRootVolumes failed: %v", err) } else { f.sortByRootVolumeSize(ins) } for _, in := range ins { select { case <-ctx.Done(): f.log.Info("VulnerabilityFetcher.FetchInstances context canceled") return nil case f.ch <- in: } } f.log.Info("VulnerabilityFetcher.FetchInstances finished") return nil } func (f VulnerabilityFetcher) attachRootVolumes(ctx context.Context, instances []*ec2.Ec2Instance) error { volumes, err := f.provider.DescribeVolumes(ctx, instances) if err != nil { return err } volumesMapping := make(map[string][]*ec2.Volume) for _, vol := range volumes { volumesMapping[vol.InstanceId] = append(volumesMapping[vol.InstanceId], vol) } for _, ins := range instances { instanceVolumes := volumesMapping[*ins.InstanceId] for _, vol := range instanceVolumes { if vol.Device == *ins.RootDeviceName { ins.RootVolume = vol break } } } return nil } func (f VulnerabilityFetcher) sortByRootVolumeSize(instances []*ec2.Ec2Instance) { sort.Slice(instances, func(i, j int) bool { if instances[i].RootVolume == nil { return false } if instances[j].RootVolume == nil { return true } return instances[i].RootVolume.Size < instances[j].RootVolume.Size }) } func (f VulnerabilityFetcher) GetChan() chan *ec2.Ec2Instance { return f.ch }