public void testKafkaConsumerProxyEnd2EndFlowWithTieredRQ()

in uforwarder/src/integrationTest/java/com/uber/data/kafka/consumerproxy/UforwarderIntegrationTests.java [246:328]


  public void testKafkaConsumerProxyEnd2EndFlowWithTieredRQ() throws Exception {
    String procedure =
        String.format(
            MockConsumerServiceStarter.METHOD_NAME_FORMAT, TEST_GROUP_NAME, TEST_TOPIC_3_NAME);
    prepareTopic(TEST_TOPIC_3_NAME, kafkaServer.getBootstrapServers());
    prepareTopic(TEST_TIER_1_RETRY_TOPIC_NAME, kafkaServer.getBootstrapServers());
    prepareTopic(TEST_TIER_2_RETRY_TOPIC_NAME, kafkaServer.getBootstrapServers());
    prepareTopic(TEST_TOPIC_3_DLQ_TOPIC_NAME, kafkaServer.getBootstrapServers());
    sendKafkaMessages(
        TEST_TOPIC_3_NAME, kafkaServer.getBootstrapServers(), NUMBER_OF_MESSAGES, false);
    UForwarderUtils.createJob(
        String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
        getAddJobGroupRequest(
            TEST_TOPIC_3_NAME,
            TEST_GROUP_NAME,
            procedure,
            TEST_TOPIC_3_DLQ_TOPIC_NAME,
            ImmutableList.of(TEST_TIER_1_RETRY_TOPIC_NAME, TEST_TIER_2_RETRY_TOPIC_NAME)));
    UForwarderUtils.createJob(
        String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
        getAddJobGroupRequest(
            TEST_TIER_1_RETRY_TOPIC_NAME,
            TEST_GROUP_NAME,
            procedure,
            TEST_TOPIC_3_DLQ_TOPIC_NAME,
            ImmutableList.of(TEST_TIER_1_RETRY_TOPIC_NAME, TEST_TIER_2_RETRY_TOPIC_NAME)));
    UForwarderUtils.createJob(
        String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
        getAddJobGroupRequest(
            TEST_TIER_2_RETRY_TOPIC_NAME,
            TEST_GROUP_NAME,
            procedure,
            TEST_TOPIC_3_DLQ_TOPIC_NAME,
            ImmutableList.of(TEST_TIER_1_RETRY_TOPIC_NAME, TEST_TIER_2_RETRY_TOPIC_NAME)));
    UForwarderUtils.createJob(
        String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
        getAddJobGroupRequest(
            TEST_TOPIC_3_DLQ_TOPIC_NAME,
            TEST_GROUP_NAME,
            procedure,
            TEST_TOPIC_3_DLQ_TOPIC_NAME,
            ImmutableList.of(TEST_TIER_1_RETRY_TOPIC_NAME, TEST_TIER_2_RETRY_TOPIC_NAME)));

    await()
        .atMost(240, TimeUnit.SECONDS)
        .untilAsserted(
            () -> {
              // The test handler should receive 6 copies of messages from
              // original queue, T1 retry queue(x2), T2 retry queue(x2), and DLQ
              Assert.assertTrue(
                  MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler.INVOKE_COUNT.get()
                      >= NUMBER_OF_MESSAGES * 6);
              for (long i = 0L; i < 6; i++) {
                Assert.assertTrue(
                    MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler
                            .RETRY_COUNT_TO_INVOKE_COUNT_MAP
                            .get(i)
                            .get()
                        >= NUMBER_OF_MESSAGES);
              }
              // T1 retry queue
              Assert.assertTrue(
                  MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler
                          .PHYSICAL_SOURCE_TO_INVOKE_COUNT_MAP
                          .get(TEST_TIER_1_RETRY_TOPIC_NAME)
                          .get()
                      >= NUMBER_OF_MESSAGES * 2);
              // T2 retry queue
              Assert.assertTrue(
                  MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler
                          .PHYSICAL_SOURCE_TO_INVOKE_COUNT_MAP
                          .get(TEST_TIER_2_RETRY_TOPIC_NAME)
                          .get()
                      >= NUMBER_OF_MESSAGES * 2);
              // DLQ
              Assert.assertTrue(
                  MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler
                          .PHYSICAL_SOURCE_TO_INVOKE_COUNT_MAP
                          .get(TEST_TOPIC_3_DLQ_TOPIC_NAME)
                          .get()
                      >= NUMBER_OF_MESSAGES);
            });
  }