func()

in internal/testrunner/runners/system/tester.go [1442:1539]


func (r *tester) waitForDocs(ctx context.Context, config *testConfig, dataStream string) (*hits, error) {
	// Use custom timeout if the service can't collect data immediately.
	waitForDataTimeout := waitForDataDefaultTimeout
	if config.WaitForDataTimeout > 0 {
		waitForDataTimeout = config.WaitForDataTimeout
	}

	if config.Assert.HitCount > elasticsearchQuerySize {
		return nil, fmt.Errorf("invalid value for assert.hit_count (%d): it must be lower of the maximum query size (%d)", config.Assert.HitCount, elasticsearchQuerySize)
	}

	if config.Assert.MinCount > elasticsearchQuerySize {
		return nil, fmt.Errorf("invalid value for assert.min_count (%d): it must be lower of the maximum query size (%d)", config.Assert.MinCount, elasticsearchQuerySize)
	}

	// (TODO in future) Optionally exercise service to generate load.
	logger.Debugf("checking for expected data in data stream (%s)...", waitForDataTimeout)
	var hits *hits
	oldHits := 0
	foundFields := map[string]any{}
	passed, waitErr := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) {
		var err error
		hits, err = r.getDocs(ctx, dataStream)
		if err != nil {
			return false, err
		}

		defer func() {
			oldHits = hits.size()
		}()

		assertHitCount := func() bool {
			if config.Assert.HitCount == 0 {
				// not enabled
				return true
			}
			if hits.size() < config.Assert.HitCount {
				return false
			}

			ret := hits.size() == oldHits
			if !ret {
				time.Sleep(4 * time.Second)
			}

			return ret
		}()

		assertFieldsPresent := func() bool {
			if len(config.Assert.FieldsPresent) == 0 {
				// not enabled
				return true
			}
			if hits.size() == 0 {
				// At least there should be one document ingested
				return false
			}
			for _, f := range config.Assert.FieldsPresent {
				if _, found := foundFields[f]; found {
					continue
				}
				found := false
				for _, d := range hits.Fields {
					if _, err := d.GetValue(f); err == nil {
						found = true
						break
					}
				}
				if !found {
					return false
				}
				logger.Debugf("Found field %q in hits", f)
				foundFields[f] = struct{}{}
			}
			return true
		}()

		assertMinCount := func() bool {
			if config.Assert.MinCount > 0 {
				return hits.size() >= config.Assert.MinCount
			}
			// By default at least one document
			return hits.size() > 0
		}()

		return assertFieldsPresent && assertMinCount && assertHitCount, nil
	}, 1*time.Second, waitForDataTimeout)

	if waitErr != nil {
		return nil, waitErr
	}

	if !passed {
		return nil, testrunner.ErrTestCaseFailed{Reason: fmt.Sprintf("could not find the expected hits in %s data stream", dataStream)}
	}

	return hits, nil
}