in internal/testrunner/runners/system/tester.go [1442:1539]
func (r *tester) waitForDocs(ctx context.Context, config *testConfig, dataStream string) (*hits, error) {
// Use custom timeout if the service can't collect data immediately.
waitForDataTimeout := waitForDataDefaultTimeout
if config.WaitForDataTimeout > 0 {
waitForDataTimeout = config.WaitForDataTimeout
}
if config.Assert.HitCount > elasticsearchQuerySize {
return nil, fmt.Errorf("invalid value for assert.hit_count (%d): it must be lower of the maximum query size (%d)", config.Assert.HitCount, elasticsearchQuerySize)
}
if config.Assert.MinCount > elasticsearchQuerySize {
return nil, fmt.Errorf("invalid value for assert.min_count (%d): it must be lower of the maximum query size (%d)", config.Assert.MinCount, elasticsearchQuerySize)
}
// (TODO in future) Optionally exercise service to generate load.
logger.Debugf("checking for expected data in data stream (%s)...", waitForDataTimeout)
var hits *hits
oldHits := 0
foundFields := map[string]any{}
passed, waitErr := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
var err error
hits, err = r.getDocs(ctx, dataStream)
if err != nil {
return false, err
}
defer func() {
oldHits = hits.size()
}()
assertHitCount := func() bool {
if config.Assert.HitCount == 0 {
// not enabled
return true
}
if hits.size() < config.Assert.HitCount {
return false
}
ret := hits.size() == oldHits
if !ret {
time.Sleep(4 * time.Second)
}
return ret
}()
assertFieldsPresent := func() bool {
if len(config.Assert.FieldsPresent) == 0 {
// not enabled
return true
}
if hits.size() == 0 {
// At least there should be one document ingested
return false
}
for _, f := range config.Assert.FieldsPresent {
if _, found := foundFields[f]; found {
continue
}
found := false
for _, d := range hits.Fields {
if _, err := d.GetValue(f); err == nil {
found = true
break
}
}
if !found {
return false
}
logger.Debugf("Found field %q in hits", f)
foundFields[f] = struct{}{}
}
return true
}()
assertMinCount := func() bool {
if config.Assert.MinCount > 0 {
return hits.size() >= config.Assert.MinCount
}
// By default at least one document
return hits.size() > 0
}()
return assertFieldsPresent && assertMinCount && assertHitCount, nil
}, 1*time.Second, waitForDataTimeout)
if waitErr != nil {
return nil, waitErr
}
if !passed {
return nil, testrunner.ErrTestCaseFailed{Reason: fmt.Sprintf("could not find the expected hits in %s data stream", dataStream)}
}
return hits, nil
}