in x-pack/apm-server/sampling/pubsub/checkpoints.go [19:63]
func getGlobalCheckpoints(
ctx context.Context,
client *elastictransport.Client,
dataStream string,
) (map[string]int64, error) {
indexGlobalCheckpoints := make(map[string]int64)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/"+dataStream+"/_stats/get?level=shards", nil)
if err != nil {
return nil, fmt.Errorf("failed to created indices request: %w", err)
}
resp, err := client.Perform(req)
if err != nil {
return nil, fmt.Errorf("index stats request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode > 299 {
switch resp.StatusCode {
case http.StatusNotFound:
// Data stream does not yet exist.
return indexGlobalCheckpoints, nil
}
message, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("index stats request failed: %s", message)
}
var stats dataStreamStats
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
return nil, err
}
for index, indexStats := range stats.Indices {
if n := len(indexStats.Shards); n > 1 {
return nil, fmt.Errorf("expected 1 shard, got %d for index %q", n, index)
}
for _, shardStats := range indexStats.Shards {
for _, shardStats := range shardStats {
if shardStats.Routing.Primary {
indexGlobalCheckpoints[index] = shardStats.SeqNo.GlobalCheckpoint
break
}
}
}
}
return indexGlobalCheckpoints, nil
}