in systemtest/infra_kafka.go [71:134]
func ProvisionKafka(ctx context.Context) error {
if err := DestroyKafka(ctx); err != nil {
return err
}
if err := execCommand(ctx,
"docker", "run", "--detach", "--name", redpandaContainerName,
"--publish=0:9093", "--health-cmd", "rpk cluster health | grep 'Healthy:.*true'",
redpandaContainerImage, "redpanda", "start",
"--kafka-addr=internal://0.0.0.0:9092,external://0.0.0.0:9093",
"--smp=1", "--memory=1G",
"--mode=dev-container",
); err != nil {
return fmt.Errorf("failed to create Redpanda container: %w", err)
}
logger().Info("waiting for Redpanda to be ready...")
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
cmd := exec.CommandContext(ctx, "docker", "inspect", redpandaContainerName)
cmd.Stderr = os.Stderr
output, err := cmd.Output()
if err != nil {
return fmt.Errorf("'docker inspect' failed: %w", err)
}
var containers []struct {
State struct {
Health struct {
Status string
}
}
NetworkSettings struct {
Ports map[string][]struct {
HostIP string
HostPort string
}
}
}
if err := json.Unmarshal(output, &containers); err != nil {
return fmt.Errorf("failed to decode 'docker inspect' output: %w", err)
}
if n := len(containers); n != 1 {
return fmt.Errorf("expected 1 container, got %d", n)
}
c := containers[0]
if c.State.Health.Status == "healthy" {
portMapping, ok := c.NetworkSettings.Ports["9093/tcp"]
if !ok || len(portMapping) == 0 {
return errors.New("missing port mapping in 'docker inspect' output")
}
redpandaHostPort = portMapping[0].HostPort
return nil
}
select {
case <-ctx.Done():
return fmt.Errorf(
"context cancelled while waiting for Redpanda to become healthy: %w",
ctx.Err(),
)
case <-ticker.C:
}
}
}