providers/fireeye/api/threat.go (84 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package api import ( "context" "encoding/json" "fmt" "sync" "github.com/facebookincubator/flog" "github.com/facebookincubator/nvdtools/providers/fireeye/schema" "github.com/facebookincubator/nvdtools/stats" ) // FetchAllThreatReportsSince will fetch all vulnerabilities with specified parameters func (c *Client) FetchAllThreatReportsSince(ctx context.Context, since int64) (<-chan *schema.Report, error) { parameters := newParametersSince(since) if err := parameters.validate(); err != nil { return nil, err } // fetch indexes reportIDs := make(chan string) wgReportIDs := sync.WaitGroup{} for _, params := range parameters.batchBy(ninetyDays) { wgReportIDs.Add(1) params := params go func() { defer wgReportIDs.Done() flog.Infof("Fetching: %s\n", params) if rIDs, err := c.fetchReportIDs(ctx, params); err == nil { for _, rID := range rIDs { reportIDs <- rID } } else { flog.Errorln(err) } }() } go func() { wgReportIDs.Wait() close(reportIDs) }() // fetch reports reports := make(chan *schema.Report) wgReports := sync.WaitGroup{} for rID := range reportIDs { wgReports.Add(1) rID := rID go func() { defer wgReports.Done() if report, err := c.fetchReport(ctx, rID); err == nil { stats.IncrementCounter("report.success") reports <- report } else { stats.IncrementCounter("report.error") flog.Errorln(err) } }() } go func() { wgReports.Wait() close(reports) }() return reports, nil } func (c *Client) fetchReportIDs(ctx context.Context, parameters timeRangeParameters) ([]string, error) { resp, err := c.Request(ctx, fmt.Sprintf("/report/index?intelligenceType=threat&%s", parameters.query())) if err != nil { return nil, err } var reportIndex []*schema.ReportIndexItem if err := json.NewDecoder(resp).Decode(&reportIndex); err != nil { return nil, err } reportIDs := make([]string, len(reportIndex)) for i := 0; i < len(reportIndex); i++ { reportIDs[i] = reportIndex[i].ReportID } return reportIDs, nil } func (c *Client) fetchReport(ctx context.Context, reportID string) (*schema.Report, error) { resp, err := c.Request(ctx, fmt.Sprintf("/report/%s?detail=full", reportID)) if err != nil { return nil, err } var wrapper schema.ReportWrapper if err := json.NewDecoder(resp).Decode(&wrapper); err != nil { return nil, err } return &wrapper.Report, nil }