in x-pack/apm-server/sampling/pubsub/pubsub.go [177:230]
func (p *Pubsub) searchTraceIDs(ctx context.Context, out chan<- string, observedSeqnos map[string]int64) (bool, error) {
globalCheckpoints, err := getGlobalCheckpoints(ctx, p.config.Client, p.config.DataStream.String())
if err != nil {
return false, err
}
// Remove old indices from the observed _seq_no map.
for index := range observedSeqnos {
if _, ok := globalCheckpoints[index]; !ok {
delete(observedSeqnos, index)
}
}
// Force-refresh the indices with updated global checkpoints.
indices := make([]string, 0, len(globalCheckpoints))
for index, globalCheckpoint := range globalCheckpoints {
observedSeqno, ok := observedSeqnos[index]
if ok && globalCheckpoint <= observedSeqno {
delete(globalCheckpoints, index)
continue
}
indices = append(indices, index)
}
if err := p.refreshIndices(ctx, indices); err != nil {
return false, err
}
var changed bool
var observedSeqnosMu sync.Mutex
g, ctx := errgroup.WithContext(ctx)
for _, index := range indices {
globalCheckpoint := globalCheckpoints[index]
observedSeqno, ok := observedSeqnos[index]
if !ok {
observedSeqno = -1
}
index := index // copy for closure
g.Go(func() error {
maxSeqno, err := p.searchIndexTraceIDs(ctx, out, index, observedSeqno, globalCheckpoint)
if err != nil {
return err
}
if maxSeqno > observedSeqno {
observedSeqnosMu.Lock()
observedSeqno = maxSeqno
observedSeqnos[index] = observedSeqno
changed = true
observedSeqnosMu.Unlock()
}
return nil
})
}
return changed, g.Wait()
}