func assertCleanupState()

in cmd/apmbench/main.go [93:151]


func assertCleanupState(ctx context.Context, telemetry telemetry, logger *zap.Logger) error {
	if len(cfg.CleanupKeys) == 0 {
		// If no cleanup keys are specified then, as a best effort, sleep
		// for a specific duration to ensure the pipelines have a chance to
		// be fully consumed and metrics to be reported.
		logger.Warn("no cleanup keys specified, benchmark results may get corrupted")
		return nil
	}

	t := time.NewTicker(5 * time.Second)
	defer t.Stop()

	var idx int
	for {
		// cleanup is successful if we have asserted all the cleanup keys.
		if idx == len(cfg.CleanupKeys) {
			logger.Debug("cleanup condition satisfied")
			return nil
		}

		select {
		case <-ctx.Done():
			return fmt.Errorf("cleanup condition not satisfied: %w", ctx.Err())
		case <-t.C:
			for idx < len(cfg.CleanupKeys) {
				key := cfg.CleanupKeys[idx]
				m, err := telemetry.Get(key)
				if err != nil {
					logger.Warn(
						"failed to get cleanup metric, will be retried",
						zap.Error(err),
						zap.String("key", key),
					)
					break
				}
				ok := true
				for gid, v := range m {
					if v != 0 {
						logger.Debug(
							"cleanup condition not satisfied, will be retried",
							zap.String("key", key),
							zap.String("group", gid),
							zap.Float64("val", v),
						)
						ok = false
						break
					}
				}
				// If a cleanup key fails then there is no need to try anything else.
				if !ok {
					break
				}
				// If the cleanup metric is successful for the current key then
				// move onto the next key.
				idx++
			}
		}
	}
}