func getGlobalCheckpoints()

in x-pack/apm-server/sampling/pubsub/checkpoints.go [19:63]


func getGlobalCheckpoints(
	ctx context.Context,
	client *elastictransport.Client,
	dataStream string,
) (map[string]int64, error) {
	indexGlobalCheckpoints := make(map[string]int64)
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/"+dataStream+"/_stats/get?level=shards", nil)
	if err != nil {
		return nil, fmt.Errorf("failed to created indices request: %w", err)
	}
	resp, err := client.Perform(req)
	if err != nil {
		return nil, fmt.Errorf("index stats request failed: %w", err)
	}
	defer resp.Body.Close()
	if resp.StatusCode > 299 {
		switch resp.StatusCode {
		case http.StatusNotFound:
			// Data stream does not yet exist.
			return indexGlobalCheckpoints, nil
		}
		message, _ := io.ReadAll(resp.Body)
		return nil, fmt.Errorf("index stats request failed: %s", message)
	}

	var stats dataStreamStats
	if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
		return nil, err
	}

	for index, indexStats := range stats.Indices {
		if n := len(indexStats.Shards); n > 1 {
			return nil, fmt.Errorf("expected 1 shard, got %d for index %q", n, index)
		}
		for _, shardStats := range indexStats.Shards {
			for _, shardStats := range shardStats {
				if shardStats.Routing.Primary {
					indexGlobalCheckpoints[index] = shardStats.SeqNo.GlobalCheckpoint
					break
				}
			}
		}
	}
	return indexGlobalCheckpoints, nil
}