func()

in internal/testrunner/runners/system/tester.go [733:812]


func (r *tester) getDocs(ctx context.Context, dataStream string) (*hits, error) {
	resp, err := r.esAPI.Search(
		r.esAPI.Search.WithContext(ctx),
		r.esAPI.Search.WithIndex(dataStream),
		r.esAPI.Search.WithSort("@timestamp:asc"),
		r.esAPI.Search.WithSize(elasticsearchQuerySize),
		r.esAPI.Search.WithSource("true"),
		r.esAPI.Search.WithBody(strings.NewReader(checkFieldsBody)),
		r.esAPI.Search.WithIgnoreUnavailable(true),
	)
	if err != nil {
		return nil, fmt.Errorf("could not search data stream: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode == http.StatusServiceUnavailable && strings.Contains(resp.String(), "no_shard_available_action_exception") {
		// Index is being created, but no shards are available yet.
		// See https://github.com/elastic/elasticsearch/issues/65846
		return &hits{}, nil
	}
	if resp.IsError() {
		return nil, fmt.Errorf("failed to search docs for data stream %s: %s", dataStream, resp.String())
	}

	var results struct {
		Hits struct {
			Total struct {
				Value int
			}
			Hits []struct {
				Source common.MapStr `json:"_source"`
				Fields common.MapStr `json:"fields"`
			}
		}
		Aggregations struct {
			AllIgnored struct {
				DocCount      int `json:"doc_count"`
				IgnoredFields struct {
					Buckets []struct {
						Key string `json:"key"`
					} `json:"buckets"`
				} `json:"ignored_fields"`
				IgnoredDocs struct {
					Hits struct {
						Hits []common.MapStr `json:"hits"`
					} `json:"hits"`
				} `json:"ignored_docs"`
			} `json:"all_ignored"`
		} `json:"aggregations"`
		Error *struct {
			Type   string
			Reason string
		}
		Status int
	}

	if err := json.NewDecoder(resp.Body).Decode(&results); err != nil {
		return nil, fmt.Errorf("could not decode search results response: %w", err)
	}

	numHits := results.Hits.Total.Value
	if results.Error != nil {
		logger.Debugf("found %d hits in %s data stream: %s: %s Status=%d",
			numHits, dataStream, results.Error.Type, results.Error.Reason, results.Status)
	} else {
		logger.Debugf("found %d hits in %s data stream", numHits, dataStream)
	}

	var hits hits
	for _, hit := range results.Hits.Hits {
		hits.Source = append(hits.Source, hit.Source)
		hits.Fields = append(hits.Fields, hit.Fields)
	}
	for _, bucket := range results.Aggregations.AllIgnored.IgnoredFields.Buckets {
		hits.IgnoredFields = append(hits.IgnoredFields, bucket.Key)
	}
	hits.DegradedDocs = results.Aggregations.AllIgnored.IgnoredDocs.Hits.Hits

	return &hits, nil
}