in x-pack/filebeat/input/streaming/crowdstrike.go [129:256]
func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, state map[string]any) (map[string]any, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.discoverURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to prepare discover stream request: %w", err)
}
resp, err := cli.Do(req)
if err != nil {
return nil, fmt.Errorf("failed GET to discover stream: %w", err)
}
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
type resource struct {
FeedURL string `json:"dataFeedURL"`
Session struct {
Token string `json:"token"`
Expires time.Time `json:"expiration"`
} `json:"sessionToken"`
RefreshURL string `json:"refreshActiveSessionURL"`
RefreshAfter int `json:"refreshActiveSessionInterval"`
}
var body struct {
Resources []resource `json:"resources"`
Meta map[string]any `json:"meta"`
}
err = dec.Decode(&body)
if err != nil {
return state, Warning{fmt.Errorf("failed to decode discover body: %w", err)}
}
s.log.Debugw("stream discover metadata", logp.Namespace(s.ns), "meta", mapstr.M(body.Meta))
var offset int
if cursor, ok := state["cursor"].(map[string]any); ok {
switch off := cursor["offset"].(type) {
case int:
offset = off
case float64:
offset = int(off)
}
}
for _, r := range body.Resources {
refreshAfter := time.Duration(r.RefreshAfter) * time.Second
go func() {
const grace = 5 * time.Minute
for {
select {
case <-ctx.Done():
return
case <-time.After(refreshAfter - grace):
s.log.Debugw("session refresh", "url", r.RefreshURL)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.RefreshURL, nil)
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to prepare refresh stream request", "error", err)
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := cli.Do(req)
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("failed to refresh stream connection", "error", err)
return
}
err = resp.Body.Close()
if err != nil {
s.metrics.errorsTotal.Inc()
s.log.Warnw("failed to close refresh response body", "error", err)
}
}
}
}()
if offset > 0 {
feedURL, err := url.Parse(r.FeedURL)
if err != nil {
return state, Warning{fmt.Errorf("failed to parse feed url: %w", err)}
}
feedQuery, err := url.ParseQuery(feedURL.RawQuery)
if err != nil {
return state, Warning{fmt.Errorf("failed to parse feed query: %w", err)}
}
feedQuery.Set("offset", strconv.Itoa(offset))
feedURL.RawQuery = feedQuery.Encode()
r.FeedURL = feedURL.String()
}
s.log.Debugw("stream request", "url", r.FeedURL)
req, err := http.NewRequestWithContext(ctx, "GET", r.FeedURL, nil)
if err != nil {
return state, Warning{fmt.Errorf("failed to make firehose request to %s: %w", r.FeedURL, err)}
}
req.Header = make(http.Header)
req.Header.Add("Accept", "application/json")
req.Header.Add("Authorization", "Token "+r.Session.Token)
resp, err := s.plainClient.Do(req)
if err != nil {
return state, Warning{fmt.Errorf("failed to get firehose from %s: %w", r.FeedURL, err)}
}
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
for {
var msg json.RawMessage
err := dec.Decode(&msg)
if err != nil {
s.metrics.errorsTotal.Inc()
//nolint:errorlint // will not be a wrapped error here.
if err == io.EOF {
s.log.Info("stream ended, restarting")
return state, nil
}
return state, Warning{fmt.Errorf("error decoding event: %w", err)}
}
s.metrics.receivedBytesTotal.Add(uint64(len(msg)))
state["response"] = []byte(msg)
s.log.Debugw("received firehose message", logp.Namespace(s.ns), "msg", debugMsg(msg))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.log.Errorw("failed to process and publish data", "error", err)
return nil, err
}
}
}
return state, nil
}