func()

in x-pack/apm-server/sampling/pubsub/pubsub.go [177:230]


func (p *Pubsub) searchTraceIDs(ctx context.Context, out chan<- string, observedSeqnos map[string]int64) (bool, error) {
	globalCheckpoints, err := getGlobalCheckpoints(ctx, p.config.Client, p.config.DataStream.String())
	if err != nil {
		return false, err
	}

	// Remove old indices from the observed _seq_no map.
	for index := range observedSeqnos {
		if _, ok := globalCheckpoints[index]; !ok {
			delete(observedSeqnos, index)
		}
	}

	// Force-refresh the indices with updated global checkpoints.
	indices := make([]string, 0, len(globalCheckpoints))
	for index, globalCheckpoint := range globalCheckpoints {
		observedSeqno, ok := observedSeqnos[index]
		if ok && globalCheckpoint <= observedSeqno {
			delete(globalCheckpoints, index)
			continue
		}
		indices = append(indices, index)
	}
	if err := p.refreshIndices(ctx, indices); err != nil {
		return false, err
	}

	var changed bool
	var observedSeqnosMu sync.Mutex
	g, ctx := errgroup.WithContext(ctx)
	for _, index := range indices {
		globalCheckpoint := globalCheckpoints[index]
		observedSeqno, ok := observedSeqnos[index]
		if !ok {
			observedSeqno = -1
		}
		index := index // copy for closure
		g.Go(func() error {
			maxSeqno, err := p.searchIndexTraceIDs(ctx, out, index, observedSeqno, globalCheckpoint)
			if err != nil {
				return err
			}
			if maxSeqno > observedSeqno {
				observedSeqnosMu.Lock()
				observedSeqno = maxSeqno
				observedSeqnos[index] = observedSeqno
				changed = true
				observedSeqnosMu.Unlock()
			}
			return nil
		})
	}
	return changed, g.Wait()
}