in cmd/queuebench/main.go [186:206]
func produce(ctx context.Context, p *kafka.Producer, topic apmqueue.Topic, size int, duration time.Duration) error {
buf := make([]byte, size)
if _, err := rand.Read(buf); err != nil {
return fmt.Errorf("cannot read random bytes: %w", err)
}
record := apmqueue.Record{
Topic: topic,
Value: buf,
}
deadline := time.Now().Add(duration)
for time.Now().Before(deadline) {
if err := p.Produce(ctx, record); err != nil {
return err
}
}
return nil
}