func()

in providers/fireeye/api/threat.go [29:87]


func (c *Client) FetchAllThreatReportsSince(ctx context.Context, since int64) (<-chan *schema.Report, error) {
	parameters := newParametersSince(since)
	if err := parameters.validate(); err != nil {
		return nil, err
	}

	// fetch indexes

	reportIDs := make(chan string)
	wgReportIDs := sync.WaitGroup{}

	for _, params := range parameters.batchBy(ninetyDays) {
		wgReportIDs.Add(1)
		params := params
		go func() {
			defer wgReportIDs.Done()
			flog.Infof("Fetching: %s\n", params)
			if rIDs, err := c.fetchReportIDs(ctx, params); err == nil {
				for _, rID := range rIDs {
					reportIDs <- rID
				}
			} else {
				flog.Errorln(err)
			}
		}()
	}

	go func() {
		wgReportIDs.Wait()
		close(reportIDs)
	}()

	// fetch reports

	reports := make(chan *schema.Report)
	wgReports := sync.WaitGroup{}

	for rID := range reportIDs {
		wgReports.Add(1)
		rID := rID
		go func() {
			defer wgReports.Done()
			if report, err := c.fetchReport(ctx, rID); err == nil {
				stats.IncrementCounter("report.success")
				reports <- report
			} else {
				stats.IncrementCounter("report.error")
				flog.Errorln(err)
			}
		}()
	}

	go func() {
		wgReports.Wait()
		close(reports)
	}()

	return reports, nil
}