in providers/fireeye/api/threat.go [29:87]
func (c *Client) FetchAllThreatReportsSince(ctx context.Context, since int64) (<-chan *schema.Report, error) {
parameters := newParametersSince(since)
if err := parameters.validate(); err != nil {
return nil, err
}
// fetch indexes
reportIDs := make(chan string)
wgReportIDs := sync.WaitGroup{}
for _, params := range parameters.batchBy(ninetyDays) {
wgReportIDs.Add(1)
params := params
go func() {
defer wgReportIDs.Done()
flog.Infof("Fetching: %s\n", params)
if rIDs, err := c.fetchReportIDs(ctx, params); err == nil {
for _, rID := range rIDs {
reportIDs <- rID
}
} else {
flog.Errorln(err)
}
}()
}
go func() {
wgReportIDs.Wait()
close(reportIDs)
}()
// fetch reports
reports := make(chan *schema.Report)
wgReports := sync.WaitGroup{}
for rID := range reportIDs {
wgReports.Add(1)
rID := rID
go func() {
defer wgReports.Done()
if report, err := c.fetchReport(ctx, rID); err == nil {
stats.IncrementCounter("report.success")
reports <- report
} else {
stats.IncrementCounter("report.error")
flog.Errorln(err)
}
}()
}
go func() {
wgReports.Wait()
close(reports)
}()
return reports, nil
}