func()

in providers/flexera/api/client.go [61:122]


func (c *Client) FetchAllVulnerabilities(ctx context.Context, since int64) (<-chan runner.Convertible, error) {
	from, to := since, time.Now().Unix()
	totalAdvisories, err := c.getNumberOfAdvisories(ctx, from, to)
	if err != nil {
		return nil, errors.Wrap(err, "failed to get total number of advisories")
	}

	mainCtx, cancel := context.WithCancel(ctx)

	numPages := (totalAdvisories-1)/pageSize + 1
	flog.Infof("starting sync for %d advisories over %d pages\n", totalAdvisories, numPages)

	identifiers := make(chan string, totalAdvisories)
	advisories := make(chan runner.Convertible, totalAdvisories)

	identifersEg, identifiersCtx := errgroup.WithContext(mainCtx)
	for page := 0; page < numPages; page++ {
		p := page + 1
		identifersEg.Go(func() error {
			list, err := c.fetchAdvisoryList(identifiersCtx, from, to, p)
			if err != nil {
				return client.StopOrContinue(errors.Wrapf(err, "failed to fetch page %d advisory list", p))
			}
			for _, element := range list.Results {
				identifiers <- element.AdvisoryIdentifier
			}
			return nil
		})
	}

	go func() {
		if err := identifersEg.Wait(); err != nil {
			flog.Errorln(err)
			cancel()
		}
		close(identifiers)
	}()

	advisoriesEg, advisoriesCtx := errgroup.WithContext(mainCtx)
	for i := 0; i < numFetchers; i++ {
		advisoriesEg.Go(func() error {
			for identifier := range identifiers {
				advisory, err := c.Fetch(advisoriesCtx, identifier)
				if err != nil {
					return client.StopOrContinue(errors.Wrapf(err, "failed to fetch advisory %s", identifier))
				}
				advisories <- advisory
			}
			return nil
		})
	}

	go func() {
		if err := advisoriesEg.Wait(); err != nil {
			flog.Errorln(err)
			cancel()
		}
		close(advisories)
	}()

	return advisories, nil
}