systemtest/infra_kafka.go (163 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package systemtest import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "net" "os" "os/exec" "strconv" "strings" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" apmqueue "github.com/elastic/apm-queue/v2" "github.com/elastic/apm-queue/v2/kafka" ) const ( redpandaContainerName = "redpanda-apm-queue" redpandaContainerImage = "docker.redpanda.com/redpandadata/redpanda:v23.1.11" ) var ( kafkaBrokers []string redpandaHostPort string ) // InitKafka initialises Kafka configuration, and returns a pair of // functions for provisioning and destroying a Kafka cluster. // // If KAFKA_BROKERS is set, provisioning and destroying are skipped, // and Kafka clients will be configured to communicate with those brokers. func InitKafka() (ProvisionInfraFunc, DestroyInfraFunc, error) { if brokers := os.Getenv("KAFKA_BROKERS"); brokers != "" { logger().Infof("KAFKA_BROKERS is set (%q), skipping Kafka cluster provisioning", brokers) kafkaBrokers = strings.Split(brokers, ",") nop := func(context.Context) error { return nil } return nop, nop, nil } logger().Infof("managing Redpanda in Docker") return ProvisionKafka, DestroyKafka, nil } // ProvisionKafka starts a single node Redpanda broker running as a local // Docker container, and configures Kafka clients to communicate with the // broker by forwarding the necessary port(s). func ProvisionKafka(ctx context.Context) error { if err := DestroyKafka(ctx); err != nil { return err } if err := execCommand(ctx, "docker", "run", "--detach", "--name", redpandaContainerName, "--publish=0:9093", "--health-cmd", "rpk cluster health | grep 'Healthy:.*true'", redpandaContainerImage, "redpanda", "start", "--kafka-addr=internal://0.0.0.0:9092,external://0.0.0.0:9093", "--smp=1", "--memory=1G", "--mode=dev-container", ); err != nil { return fmt.Errorf("failed to create Redpanda container: %w", err) } logger().Info("waiting for Redpanda to be ready...") ticker := time.NewTicker(time.Second) defer ticker.Stop() for { cmd := exec.CommandContext(ctx, "docker", "inspect", redpandaContainerName) cmd.Stderr = os.Stderr output, err := cmd.Output() if err != nil { return fmt.Errorf("'docker inspect' failed: %w", err) } var containers []struct { State struct { Health struct { Status string } } NetworkSettings struct { Ports map[string][]struct { HostIP string HostPort string } } } if err := json.Unmarshal(output, &containers); err != nil { return fmt.Errorf("failed to decode 'docker inspect' output: %w", err) } if n := len(containers); n != 1 { return fmt.Errorf("expected 1 container, got %d", n) } c := containers[0] if c.State.Health.Status == "healthy" { portMapping, ok := c.NetworkSettings.Ports["9093/tcp"] if !ok || len(portMapping) == 0 { return errors.New("missing port mapping in 'docker inspect' output") } redpandaHostPort = portMapping[0].HostPort return nil } select { case <-ctx.Done(): return fmt.Errorf( "context cancelled while waiting for Redpanda to become healthy: %w", ctx.Err(), ) case <-ticker.C: } } } // DestroyKafka destroys the Redpanda Docker container. func DestroyKafka(ctx context.Context) error { if err := execCommand(ctx, "docker", "rm", "-f", redpandaContainerName); err != nil { return fmt.Errorf("failed to delete Redpanda container: %w", err) } return nil } // NewKafkaManager returns a new kafka.Manager for the configured broker. func NewKafkaManager(t testing.TB) *kafka.Manager { mgr, err := kafka.NewManager(kafka.ManagerConfig{ CommonConfig: KafkaCommonConfig(t, kafka.CommonConfig{ Logger: defaultCfg.loggerF(t), }), }) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, mgr.Close()) }) return mgr } // CreateKafkaTopics interacts with the Kafka broker to create topics, // deleting them when the test completes. // // Topics are created with given partitions and 1 hour of retention. func CreateKafkaTopics(ctx context.Context, t testing.TB, partitions int, topics ...apmqueue.Topic) { manager := NewKafkaManager(t) topicCreator, err := manager.NewTopicCreator(kafka.TopicCreatorConfig{ PartitionCount: partitions, TopicConfigs: map[string]string{ "retention.ms": strconv.FormatInt(time.Hour.Milliseconds(), 10), }, }) require.NoError(t, err) err = topicCreator.CreateTopics(ctx, topics...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, manager.DeleteTopics( context.Background(), topics..., )) }) } func execCommand(ctx context.Context, command string, args ...string) error { return execCommandStdin(ctx, nil, command, args...) } func execCommandStdin(ctx context.Context, stdin io.Reader, command string, args ...string) error { var buf bytes.Buffer cmd := exec.CommandContext(ctx, command, args...) cmd.Stdout = &buf cmd.Stderr = &buf cmd.Stdin = stdin if err := cmd.Run(); err != nil { return fmt.Errorf( "%s command failed: %w (%s)", command, err, strings.TrimSpace(buf.String()), ) } return nil } // KafkaCommonConfig returns a kafka.CommonConfig suitable for connecting to // the configured Kafka broker in tests. // // When Redpanda is running locally in Docker, this will ignore the advertised // address and use the forwarded port. func KafkaCommonConfig(t testing.TB, cfg kafka.CommonConfig) kafka.CommonConfig { cfg.Brokers = append([]string{}, kafkaBrokers...) if len(cfg.Brokers) == 0 { brokerAddress := fmt.Sprintf("127.0.0.1:%s", redpandaHostPort) netDialer := &net.Dialer{Timeout: 10 * time.Second} cfg.Brokers = []string{brokerAddress} cfg.Dialer = func(ctx context.Context, network, _ string) (net.Conn, error) { // The advertised broker address is not reachable from // the host; replace it with the port-forwarded address. addr := brokerAddress return netDialer.DialContext(ctx, network, addr) } } return cfg }