systemtest/providers.go (117 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package systemtest import ( "context" "testing" "time" "go.uber.org/zap" apmqueue "github.com/elastic/apm-queue/v2" "github.com/elastic/apm-queue/v2/kafka" ) type config struct { processor apmqueue.Processor sync bool dt apmqueue.DeliveryType maxPollRecords int loggerF func(testing.TB) *zap.Logger topicsF func(testing.TB) []apmqueue.Topic partitions int } const ( defaultProvisionTimeout = 90 * time.Second defaultConsumerWaitTimeout = 90 * time.Second defaultConsumerExitTimeout = 20 * time.Second ) var ( defaultCfg = config{ sync: true, loggerF: TestLogger, topicsF: func(t testing.TB) []apmqueue.Topic { topics := SuffixTopics(apmqueue.Topic(t.Name())) return topics }, partitions: 1, } ) type option func(*config) type providerF func(testing.TB, ...option) (apmqueue.Producer, apmqueue.Consumer) func forEachProvider(t *testing.T, f func(*testing.T, providerF)) { t.Run("Kafka", func(t *testing.T) { f(t, kafkaTypes) }) } func forEachDeliveryType(t *testing.T, f func(*testing.T, apmqueue.DeliveryType)) { t.Run("ALOD", func(t *testing.T) { f(t, apmqueue.AtLeastOnceDeliveryType) }) t.Run("AMOD", func(t *testing.T) { f(t, apmqueue.AtMostOnceDeliveryType) }) } func runAsyncAndSync(t *testing.T, f func(*testing.T, bool)) { t.Run("sync", func(t *testing.T) { f(t, true) }) t.Run("async", func(t *testing.T) { f(t, false) }) } func kafkaTypes(t testing.TB, opts ...option) (apmqueue.Producer, apmqueue.Consumer) { ctx, cancel := context.WithTimeout(context.Background(), defaultProvisionTimeout) defer cancel() cfg := defaultCfg for _, opt := range opts { opt(&cfg) } logger := cfg.loggerF(t) topics := cfg.topicsF(t) CreateKafkaTopics(ctx, t, cfg.partitions, topics...) producer := newKafkaProducer(t, kafka.ProducerConfig{ CommonConfig: kafka.CommonConfig{Logger: logger.Named("producer")}, Sync: cfg.sync, }) consumer := newKafkaConsumer(t, kafka.ConsumerConfig{ CommonConfig: kafka.CommonConfig{Logger: logger.Named("consumer")}, Topics: topics, GroupID: t.Name(), Processor: cfg.processor, Delivery: cfg.dt, }) return producer, consumer } func withProcessor(p apmqueue.Processor) option { return func(c *config) { c.processor = p } } func withSync(sync bool) option { return func(c *config) { c.sync = sync } } func withDeliveryType(dt apmqueue.DeliveryType) option { return func(c *config) { c.dt = dt } } func withMaxPollRecords(max int) option { return func(c *config) { c.maxPollRecords = max } } func withTopic(topicsGenerator func(testing.TB) apmqueue.Topic) option { return func(c *config) { c.topicsF = func(t testing.TB) []apmqueue.Topic { return []apmqueue.Topic{topicsGenerator(t)} } } } func withTopicsGenerator(topicsGenerator func(testing.TB) []apmqueue.Topic) option { return func(c *config) { c.topicsF = topicsGenerator } } func withPartitions(count int) option { return func(c *config) { c.partitions = count } }