uforwarder-container/src/integrationTest/java/com/uber/data/kafka/consumerproxy/container/UforwarderContainerIntegrationTests.java [158:181]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        getAddJobGroupRequest(TEST_TOPIC_NAME, TEST_GROUP_NAME, procedure, "", ImmutableList.of()));

    // Normal case
    sendKafkaMessages(
        TEST_TOPIC_NAME, kafkaServer.getBootstrapServers(), NUMBER_OF_MESSAGES, false);
    await()
        .atMost(240, TimeUnit.SECONDS)
        .untilAsserted(
            () -> {
              Assert.assertTrue(
                  MockConsumerServiceStarter.TestKafkaConsumerHandler.INVOKE_COUNT.get()
                      >= NUMBER_OF_MESSAGES);
            });

    // Empty message case
    sendKafkaMessages(TEST_TOPIC_NAME, kafkaServer.getBootstrapServers(), NUMBER_OF_MESSAGES, true);
    await()
        .atMost(240, TimeUnit.SECONDS)
        .untilAsserted(
            () -> {
              Assert.assertTrue(
                  MockConsumerServiceStarter.TestKafkaConsumerHandler.INVOKE_COUNT.get()
                      >= NUMBER_OF_MESSAGES);
            });
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



uforwarder/src/integrationTest/java/com/uber/data/kafka/consumerproxy/UforwarderIntegrationTests.java [172:195]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        getAddJobGroupRequest(TEST_TOPIC_NAME, TEST_GROUP_NAME, procedure, "", ImmutableList.of()));

    // Normal case
    sendKafkaMessages(
        TEST_TOPIC_NAME, kafkaServer.getBootstrapServers(), NUMBER_OF_MESSAGES, false);
    await()
        .atMost(240, TimeUnit.SECONDS)
        .untilAsserted(
            () -> {
              Assert.assertTrue(
                  MockConsumerServiceStarter.TestKafkaConsumerHandler.INVOKE_COUNT.get()
                      >= NUMBER_OF_MESSAGES);
            });

    // Empty message case
    sendKafkaMessages(TEST_TOPIC_NAME, kafkaServer.getBootstrapServers(), NUMBER_OF_MESSAGES, true);
    await()
        .atMost(240, TimeUnit.SECONDS)
        .untilAsserted(
            () -> {
              Assert.assertTrue(
                  MockConsumerServiceStarter.TestKafkaConsumerHandler.INVOKE_COUNT.get()
                      >= NUMBER_OF_MESSAGES);
            });
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



