in internal/testrunner/runners/system/tester.go [733:812]
func (r *tester) getDocs(ctx context.Context, dataStream string) (*hits, error) {
resp, err := r.esAPI.Search(
r.esAPI.Search.WithContext(ctx),
r.esAPI.Search.WithIndex(dataStream),
r.esAPI.Search.WithSort("@timestamp:asc"),
r.esAPI.Search.WithSize(elasticsearchQuerySize),
r.esAPI.Search.WithSource("true"),
r.esAPI.Search.WithBody(strings.NewReader(checkFieldsBody)),
r.esAPI.Search.WithIgnoreUnavailable(true),
)
if err != nil {
return nil, fmt.Errorf("could not search data stream: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusServiceUnavailable && strings.Contains(resp.String(), "no_shard_available_action_exception") {
// Index is being created, but no shards are available yet.
// See https://github.com/elastic/elasticsearch/issues/65846
return &hits{}, nil
}
if resp.IsError() {
return nil, fmt.Errorf("failed to search docs for data stream %s: %s", dataStream, resp.String())
}
var results struct {
Hits struct {
Total struct {
Value int
}
Hits []struct {
Source common.MapStr `json:"_source"`
Fields common.MapStr `json:"fields"`
}
}
Aggregations struct {
AllIgnored struct {
DocCount int `json:"doc_count"`
IgnoredFields struct {
Buckets []struct {
Key string `json:"key"`
} `json:"buckets"`
} `json:"ignored_fields"`
IgnoredDocs struct {
Hits struct {
Hits []common.MapStr `json:"hits"`
} `json:"hits"`
} `json:"ignored_docs"`
} `json:"all_ignored"`
} `json:"aggregations"`
Error *struct {
Type string
Reason string
}
Status int
}
if err := json.NewDecoder(resp.Body).Decode(&results); err != nil {
return nil, fmt.Errorf("could not decode search results response: %w", err)
}
numHits := results.Hits.Total.Value
if results.Error != nil {
logger.Debugf("found %d hits in %s data stream: %s: %s Status=%d",
numHits, dataStream, results.Error.Type, results.Error.Reason, results.Status)
} else {
logger.Debugf("found %d hits in %s data stream", numHits, dataStream)
}
var hits hits
for _, hit := range results.Hits.Hits {
hits.Source = append(hits.Source, hit.Source)
hits.Fields = append(hits.Fields, hit.Fields)
}
for _, bucket := range results.Aggregations.AllIgnored.IgnoredFields.Buckets {
hits.IgnoredFields = append(hits.IgnoredFields, bucket.Key)
}
hits.DegradedDocs = results.Aggregations.AllIgnored.IgnoredDocs.Hits.Hits
return &hits, nil
}