in providers/flexera/api/client.go [61:122]
func (c *Client) FetchAllVulnerabilities(ctx context.Context, since int64) (<-chan runner.Convertible, error) {
from, to := since, time.Now().Unix()
totalAdvisories, err := c.getNumberOfAdvisories(ctx, from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to get total number of advisories")
}
mainCtx, cancel := context.WithCancel(ctx)
numPages := (totalAdvisories-1)/pageSize + 1
flog.Infof("starting sync for %d advisories over %d pages\n", totalAdvisories, numPages)
identifiers := make(chan string, totalAdvisories)
advisories := make(chan runner.Convertible, totalAdvisories)
identifersEg, identifiersCtx := errgroup.WithContext(mainCtx)
for page := 0; page < numPages; page++ {
p := page + 1
identifersEg.Go(func() error {
list, err := c.fetchAdvisoryList(identifiersCtx, from, to, p)
if err != nil {
return client.StopOrContinue(errors.Wrapf(err, "failed to fetch page %d advisory list", p))
}
for _, element := range list.Results {
identifiers <- element.AdvisoryIdentifier
}
return nil
})
}
go func() {
if err := identifersEg.Wait(); err != nil {
flog.Errorln(err)
cancel()
}
close(identifiers)
}()
advisoriesEg, advisoriesCtx := errgroup.WithContext(mainCtx)
for i := 0; i < numFetchers; i++ {
advisoriesEg.Go(func() error {
for identifier := range identifiers {
advisory, err := c.Fetch(advisoriesCtx, identifier)
if err != nil {
return client.StopOrContinue(errors.Wrapf(err, "failed to fetch advisory %s", identifier))
}
advisories <- advisory
}
return nil
})
}
go func() {
if err := advisoriesEg.Wait(); err != nil {
flog.Errorln(err)
cancel()
}
close(advisories)
}()
return advisories, nil
}