in x-pack/apm-server/sampling/pubsub/pubsub.go [256:332]
func (p *Pubsub) searchIndexTraceIDs(ctx context.Context, out chan<- string, index string, minSeqno, maxSeqno int64) (int64, error) {
var maxObservedSeqno int64 = -1
for maxObservedSeqno < maxSeqno {
// Include only documents after the old global checkpoint,
// and up to and including the new global checkpoint.
filters := []map[string]interface{}{{
"range": map[string]interface{}{
"_seq_no": map[string]interface{}{
"lte": maxSeqno,
},
},
}}
if minSeqno >= 0 {
filters = append(filters, map[string]interface{}{
"range": map[string]interface{}{
"_seq_no": map[string]interface{}{
"gt": minSeqno,
},
},
})
}
searchBody := map[string]interface{}{
"size": 1000,
"sort": []interface{}{map[string]interface{}{"_seq_no": "asc"}},
"seq_no_primary_term": true,
"track_total_hits": false,
"query": map[string]interface{}{
"bool": map[string]interface{}{
// Filter out local observations.
"must_not": map[string]interface{}{
"term": map[string]interface{}{
"agent.ephemeral_id": map[string]interface{}{
"value": p.config.ServerID,
},
},
},
"filter": filters,
},
},
}
var result struct {
Hits struct {
Hits []struct {
Seqno int64 `json:"_seq_no"`
Source traceIDDocument `json:"_source"`
Sort []interface{} `json:"sort"`
}
}
}
b, err := json.Marshal(searchBody)
if err != nil {
return -1, err
}
if err := p.doSearchRequest(ctx, index, bytes.NewReader(b), &result); err != nil {
if err == errIndexNotFound {
// Index was deleted.
break
}
return -1, err
}
if len(result.Hits.Hits) == 0 {
break
}
for _, hit := range result.Hits.Hits {
select {
case <-ctx.Done():
return -1, ctx.Err()
case out <- hit.Source.Trace.ID:
}
}
maxObservedSeqno = result.Hits.Hits[len(result.Hits.Hits)-1].Seqno
minSeqno = maxObservedSeqno
}
return maxObservedSeqno, nil
}