public static void setup()

in uforwarder/src/integrationTest/java/com/uber/data/kafka/consumerproxy/UforwarderIntegrationTests.java [100:154]


  public static void setup() throws Exception {
    mockConsumerServer =
        MockConsumerServiceStarter.startTestService(
            TEST_GROUP_NAME,
            ImmutableList.of(
                // Simple consumer
                ConsumerBytesServerMethodDefinition.of(
                    TEST_GROUP_NAME,
                    TEST_TOPIC_NAME,
                    new MockConsumerServiceStarter.TestKafkaConsumerHandler()),
                // RQ + DLQ
                ConsumerBytesServerMethodDefinition.of(
                    TEST_GROUP_NAME,
                    TEST_TOPIC_2_NAME,
                    new MockConsumerServiceStarter.NackingTestKafkaConsumerHandler()),
                // TRQ
                ConsumerBytesServerMethodDefinition.of(
                    TEST_GROUP_NAME,
                    TEST_TOPIC_3_NAME,
                    new MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler())));
    zkServer =
        new GenericContainer(TEST_IMAGE_ZOOKEEPER)
            .withExposedPorts(Constants.ZOOKEEPER_PORT)
            .withEnv("ALLOW_ANONYMOUS_LOGIN", "yes");
    zkServer.setNetwork(network);
    zkServer.start();
    NetworkUtils.assertPortInUseWithTimeout(zkServer.getMappedPort(ZOOKEEPER_PORT), 30);

    kafkaServer =
        new KafkaContainer(TEST_IMAGE_KAFKA)
            .withExternalZookeeper(
                NetworkUtils.getIpAddress(zkServer, network) + ":" + ZOOKEEPER_PORT);
    kafkaServer.setNetwork(network);
    kafkaServer.start();
    NetworkUtils.assertPortInUseWithTimeout(kafkaServer.getFirstMappedPort(), 30);

    controllerGrpcPort =
        Failsafe.with(
                new RetryPolicy<Integer>()
                    .withDelay(Duration.ofSeconds(1))
                    .withMaxRetries(2)
                    .handle(ConditionTimeoutException.class))
            .get(
                () -> {
                  int port = NetworkUtils.getRandomAvailablePort();
                  UForwarderStarter.startUForwarderMaster(
                      kafkaServer.getBootstrapServers(),
                      String.format(ZK_CONNECT_TEMPLATE, zkServer.getMappedPort(ZOOKEEPER_PORT)),
                      port);
                  return port;
                });

    UForwarderStarter.startUForwarderWorker(
        kafkaServer.getBootstrapServers(), Constants.MASTER_HOST + ":" + controllerGrpcPort);
  }