func()

in x-pack/apm-server/sampling/pubsub/pubsub.go [256:332]


func (p *Pubsub) searchIndexTraceIDs(ctx context.Context, out chan<- string, index string, minSeqno, maxSeqno int64) (int64, error) {
	var maxObservedSeqno int64 = -1
	for maxObservedSeqno < maxSeqno {
		// Include only documents after the old global checkpoint,
		// and up to and including the new global checkpoint.
		filters := []map[string]interface{}{{
			"range": map[string]interface{}{
				"_seq_no": map[string]interface{}{
					"lte": maxSeqno,
				},
			},
		}}
		if minSeqno >= 0 {
			filters = append(filters, map[string]interface{}{
				"range": map[string]interface{}{
					"_seq_no": map[string]interface{}{
						"gt": minSeqno,
					},
				},
			})
		}

		searchBody := map[string]interface{}{
			"size":                1000,
			"sort":                []interface{}{map[string]interface{}{"_seq_no": "asc"}},
			"seq_no_primary_term": true,
			"track_total_hits":    false,
			"query": map[string]interface{}{
				"bool": map[string]interface{}{
					// Filter out local observations.
					"must_not": map[string]interface{}{
						"term": map[string]interface{}{
							"agent.ephemeral_id": map[string]interface{}{
								"value": p.config.ServerID,
							},
						},
					},
					"filter": filters,
				},
			},
		}

		var result struct {
			Hits struct {
				Hits []struct {
					Seqno  int64           `json:"_seq_no"`
					Source traceIDDocument `json:"_source"`
					Sort   []interface{}   `json:"sort"`
				}
			}
		}
		b, err := json.Marshal(searchBody)
		if err != nil {
			return -1, err
		}
		if err := p.doSearchRequest(ctx, index, bytes.NewReader(b), &result); err != nil {
			if err == errIndexNotFound {
				// Index was deleted.
				break
			}
			return -1, err
		}
		if len(result.Hits.Hits) == 0 {
			break
		}
		for _, hit := range result.Hits.Hits {
			select {
			case <-ctx.Done():
				return -1, ctx.Err()
			case out <- hit.Source.Trace.ID:
			}
		}
		maxObservedSeqno = result.Hits.Hits[len(result.Hits.Hits)-1].Seqno
		minSeqno = maxObservedSeqno
	}
	return maxObservedSeqno, nil
}