func()

in x-pack/filebeat/input/streaming/crowdstrike.go [129:256]


func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client, state map[string]any) (map[string]any, error) {
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.discoverURL, nil)
	if err != nil {
		return nil, fmt.Errorf("failed to prepare discover stream request: %w", err)
	}
	resp, err := cli.Do(req)
	if err != nil {
		return nil, fmt.Errorf("failed GET to discover stream: %w", err)
	}
	defer resp.Body.Close()

	dec := json.NewDecoder(resp.Body)

	type resource struct {
		FeedURL string `json:"dataFeedURL"`
		Session struct {
			Token   string    `json:"token"`
			Expires time.Time `json:"expiration"`
		} `json:"sessionToken"`
		RefreshURL   string `json:"refreshActiveSessionURL"`
		RefreshAfter int    `json:"refreshActiveSessionInterval"`
	}
	var body struct {
		Resources []resource     `json:"resources"`
		Meta      map[string]any `json:"meta"`
	}
	err = dec.Decode(&body)
	if err != nil {
		return state, Warning{fmt.Errorf("failed to decode discover body: %w", err)}
	}
	s.log.Debugw("stream discover metadata", logp.Namespace(s.ns), "meta", mapstr.M(body.Meta))

	var offset int
	if cursor, ok := state["cursor"].(map[string]any); ok {
		switch off := cursor["offset"].(type) {
		case int:
			offset = off
		case float64:
			offset = int(off)
		}
	}

	for _, r := range body.Resources {
		refreshAfter := time.Duration(r.RefreshAfter) * time.Second
		go func() {
			const grace = 5 * time.Minute
			for {
				select {
				case <-ctx.Done():
					return
				case <-time.After(refreshAfter - grace):
					s.log.Debugw("session refresh", "url", r.RefreshURL)
					req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.RefreshURL, nil)
					if err != nil {
						s.metrics.errorsTotal.Inc()
						s.log.Errorw("failed to prepare refresh stream request", "error", err)
						return
					}
					req.Header.Set("Content-Type", "application/json")
					resp, err := cli.Do(req)
					if err != nil {
						s.metrics.errorsTotal.Inc()
						s.log.Errorw("failed to refresh stream connection", "error", err)
						return
					}
					err = resp.Body.Close()
					if err != nil {
						s.metrics.errorsTotal.Inc()
						s.log.Warnw("failed to close refresh response body", "error", err)
					}
				}
			}
		}()

		if offset > 0 {
			feedURL, err := url.Parse(r.FeedURL)
			if err != nil {
				return state, Warning{fmt.Errorf("failed to parse feed url: %w", err)}
			}
			feedQuery, err := url.ParseQuery(feedURL.RawQuery)
			if err != nil {
				return state, Warning{fmt.Errorf("failed to parse feed query: %w", err)}
			}
			feedQuery.Set("offset", strconv.Itoa(offset))
			feedURL.RawQuery = feedQuery.Encode()
			r.FeedURL = feedURL.String()
		}

		s.log.Debugw("stream request", "url", r.FeedURL)
		req, err := http.NewRequestWithContext(ctx, "GET", r.FeedURL, nil)
		if err != nil {
			return state, Warning{fmt.Errorf("failed to make firehose request to %s: %w", r.FeedURL, err)}
		}
		req.Header = make(http.Header)
		req.Header.Add("Accept", "application/json")
		req.Header.Add("Authorization", "Token "+r.Session.Token)

		resp, err := s.plainClient.Do(req)
		if err != nil {
			return state, Warning{fmt.Errorf("failed to get firehose from %s: %w", r.FeedURL, err)}
		}
		defer resp.Body.Close()

		dec := json.NewDecoder(resp.Body)
		for {
			var msg json.RawMessage
			err := dec.Decode(&msg)
			if err != nil {
				s.metrics.errorsTotal.Inc()
				//nolint:errorlint // will not be a wrapped error here.
				if err == io.EOF {
					s.log.Info("stream ended, restarting")
					return state, nil
				}
				return state, Warning{fmt.Errorf("error decoding event: %w", err)}
			}
			s.metrics.receivedBytesTotal.Add(uint64(len(msg)))
			state["response"] = []byte(msg)
			s.log.Debugw("received firehose message", logp.Namespace(s.ns), "msg", debugMsg(msg))
			err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
			if err != nil {
				s.log.Errorw("failed to process and publish data", "error", err)
				return nil, err
			}
		}
	}
	return state, nil
}