in uforwarder/src/integrationTest/java/com/uber/data/kafka/consumerproxy/UforwarderIntegrationTests.java [100:154]
public static void setup() throws Exception {
mockConsumerServer =
MockConsumerServiceStarter.startTestService(
TEST_GROUP_NAME,
ImmutableList.of(
// Simple consumer
ConsumerBytesServerMethodDefinition.of(
TEST_GROUP_NAME,
TEST_TOPIC_NAME,
new MockConsumerServiceStarter.TestKafkaConsumerHandler()),
// RQ + DLQ
ConsumerBytesServerMethodDefinition.of(
TEST_GROUP_NAME,
TEST_TOPIC_2_NAME,
new MockConsumerServiceStarter.NackingTestKafkaConsumerHandler()),
// TRQ
ConsumerBytesServerMethodDefinition.of(
TEST_GROUP_NAME,
TEST_TOPIC_3_NAME,
new MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler())));
zkServer =
new GenericContainer(TEST_IMAGE_ZOOKEEPER)
.withExposedPorts(Constants.ZOOKEEPER_PORT)
.withEnv("ALLOW_ANONYMOUS_LOGIN", "yes");
zkServer.setNetwork(network);
zkServer.start();
NetworkUtils.assertPortInUseWithTimeout(zkServer.getMappedPort(ZOOKEEPER_PORT), 30);
kafkaServer =
new KafkaContainer(TEST_IMAGE_KAFKA)
.withExternalZookeeper(
NetworkUtils.getIpAddress(zkServer, network) + ":" + ZOOKEEPER_PORT);
kafkaServer.setNetwork(network);
kafkaServer.start();
NetworkUtils.assertPortInUseWithTimeout(kafkaServer.getFirstMappedPort(), 30);
controllerGrpcPort =
Failsafe.with(
new RetryPolicy<Integer>()
.withDelay(Duration.ofSeconds(1))
.withMaxRetries(2)
.handle(ConditionTimeoutException.class))
.get(
() -> {
int port = NetworkUtils.getRandomAvailablePort();
UForwarderStarter.startUForwarderMaster(
kafkaServer.getBootstrapServers(),
String.format(ZK_CONNECT_TEMPLATE, zkServer.getMappedPort(ZOOKEEPER_PORT)),
port);
return port;
});
UForwarderStarter.startUForwarderWorker(
kafkaServer.getBootstrapServers(), Constants.MASTER_HOST + ":" + controllerGrpcPort);
}