in uforwarder-container/src/integrationTest/java/com/uber/data/kafka/consumerproxy/container/UforwarderContainerIntegrationTests.java [84:136]
public static void setup() throws Exception {
mockConsumerServer =
MockConsumerServiceStarter.startTestService(
TEST_GROUP_NAME,
TEST_SERVICE_GRPC_PORT,
ImmutableList.of(
// Simple consumer
ConsumerBytesServerMethodDefinition.of(
TEST_GROUP_NAME,
TEST_TOPIC_NAME,
new MockConsumerServiceStarter.TestKafkaConsumerHandler())));
zkServer =
new GenericContainer(TEST_IMAGE_ZOOKEEPER)
.withExposedPorts(Constants.ZOOKEEPER_PORT)
.withEnv("ALLOW_ANONYMOUS_LOGIN", "yes");
zkServer.setNetwork(network);
zkServer.start();
NetworkUtils.assertPortInUseWithTimeout(zkServer.getMappedPort(Constants.ZOOKEEPER_PORT), 30);
kafkaServer =
new KafkaContainer(TEST_IMAGE_KAFKA)
.withExternalZookeeper(
NetworkUtils.getIpAddress(zkServer, network) + ":" + Constants.ZOOKEEPER_PORT);
kafkaServer.setNetwork(network);
kafkaServer.start();
NetworkUtils.assertPortInUseWithTimeout(kafkaServer.getFirstMappedPort(), 30);
UForwarderUtils.prepareZookeeperForController(
String.format(
"%s:%s", zkServer.getHost(), zkServer.getMappedPort(Constants.ZOOKEEPER_PORT)));
String kafkaBootstrap =
NetworkUtils.getIpAddress(kafkaServer, network) + ":" + Constants.KAFKA_PORT;
controller =
new UForwarderControllerContainer(TEST_IMAGE_UFORWARDER)
.withImagePullPolicy(PullPolicy.defaultPolicy())
.withExposedPorts(CONTROLLER_GRPC_PORT)
.withZookeeperConnect(
NetworkUtils.getIpAddress(zkServer, network) + ":" + Constants.ZOOKEEPER_PORT)
.withKafkaBootstrapString(kafkaBootstrap);
controller.setNetwork(network);
controller.start();
worker =
new UForwarderWorkerContainer(TEST_IMAGE_UFORWARDER)
.withImagePullPolicy(PullPolicy.defaultPolicy())
.withController(
NetworkUtils.getIpAddress(controller, network) + ":" + CONTROLLER_GRPC_PORT)
.withKafkaBootstrapString(kafkaBootstrap);
worker.setNetwork(network);
worker.start();
}