in cmd/apmbench/main.go [93:151]
func assertCleanupState(ctx context.Context, telemetry telemetry, logger *zap.Logger) error {
if len(cfg.CleanupKeys) == 0 {
// If no cleanup keys are specified then, as a best effort, sleep
// for a specific duration to ensure the pipelines have a chance to
// be fully consumed and metrics to be reported.
logger.Warn("no cleanup keys specified, benchmark results may get corrupted")
return nil
}
t := time.NewTicker(5 * time.Second)
defer t.Stop()
var idx int
for {
// cleanup is successful if we have asserted all the cleanup keys.
if idx == len(cfg.CleanupKeys) {
logger.Debug("cleanup condition satisfied")
return nil
}
select {
case <-ctx.Done():
return fmt.Errorf("cleanup condition not satisfied: %w", ctx.Err())
case <-t.C:
for idx < len(cfg.CleanupKeys) {
key := cfg.CleanupKeys[idx]
m, err := telemetry.Get(key)
if err != nil {
logger.Warn(
"failed to get cleanup metric, will be retried",
zap.Error(err),
zap.String("key", key),
)
break
}
ok := true
for gid, v := range m {
if v != 0 {
logger.Debug(
"cleanup condition not satisfied, will be retried",
zap.String("key", key),
zap.String("group", gid),
zap.Float64("val", v),
)
ok = false
break
}
}
// If a cleanup key fails then there is no need to try anything else.
if !ok {
break
}
// If the cleanup metric is successful for the current key then
// move onto the next key.
idx++
}
}
}
}