func ProvisionKafka()

in systemtest/infra_kafka.go [71:134]


func ProvisionKafka(ctx context.Context) error {
	if err := DestroyKafka(ctx); err != nil {
		return err
	}

	if err := execCommand(ctx,
		"docker", "run", "--detach", "--name", redpandaContainerName,
		"--publish=0:9093", "--health-cmd", "rpk cluster health | grep 'Healthy:.*true'",
		redpandaContainerImage, "redpanda", "start",
		"--kafka-addr=internal://0.0.0.0:9092,external://0.0.0.0:9093",
		"--smp=1", "--memory=1G",
		"--mode=dev-container",
	); err != nil {
		return fmt.Errorf("failed to create Redpanda container: %w", err)
	}

	logger().Info("waiting for Redpanda to be ready...")
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()
	for {
		cmd := exec.CommandContext(ctx, "docker", "inspect", redpandaContainerName)
		cmd.Stderr = os.Stderr
		output, err := cmd.Output()
		if err != nil {
			return fmt.Errorf("'docker inspect' failed: %w", err)
		}
		var containers []struct {
			State struct {
				Health struct {
					Status string
				}
			}
			NetworkSettings struct {
				Ports map[string][]struct {
					HostIP   string
					HostPort string
				}
			}
		}
		if err := json.Unmarshal(output, &containers); err != nil {
			return fmt.Errorf("failed to decode 'docker inspect' output: %w", err)
		}
		if n := len(containers); n != 1 {
			return fmt.Errorf("expected 1 container, got %d", n)
		}
		c := containers[0]
		if c.State.Health.Status == "healthy" {
			portMapping, ok := c.NetworkSettings.Ports["9093/tcp"]
			if !ok || len(portMapping) == 0 {
				return errors.New("missing port mapping in 'docker inspect' output")
			}
			redpandaHostPort = portMapping[0].HostPort
			return nil
		}
		select {
		case <-ctx.Done():
			return fmt.Errorf(
				"context cancelled while waiting for Redpanda to become healthy: %w",
				ctx.Err(),
			)
		case <-ticker.C:
		}
	}
}