public void testKafkaConsumerProxyEnd2EndFlowWithRQAndDLQ()

in uforwarder/src/integrationTest/java/com/uber/data/kafka/consumerproxy/UforwarderIntegrationTests.java [190:234]


  public void testKafkaConsumerProxyEnd2EndFlowWithRQAndDLQ() throws Exception {
    String procedure =
        String.format(
            MockConsumerServiceStarter.METHOD_NAME_FORMAT, TEST_GROUP_NAME, TEST_TOPIC_2_NAME);
    prepareTopic(TEST_TOPIC_2_NAME, kafkaServer.getBootstrapServers());
    prepareTopic(TEST_RETRY_TOPIC_NAME, kafkaServer.getBootstrapServers());
    prepareTopic(TEST_TOPIC_2_DLQ_TOPIC_NAME, kafkaServer.getBootstrapServers());
    sendKafkaMessages(
        TEST_TOPIC_2_NAME, kafkaServer.getBootstrapServers(), NUMBER_OF_MESSAGES, false);
    UForwarderUtils.createJob(
        String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
        getAddJobGroupRequest(
            TEST_TOPIC_2_NAME,
            TEST_GROUP_NAME,
            procedure,
            TEST_TOPIC_2_DLQ_TOPIC_NAME,
            ImmutableList.of(TEST_RETRY_TOPIC_NAME)));
    UForwarderUtils.createJob(
        String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
        getAddJobGroupRequest(
            TEST_RETRY_TOPIC_NAME,
            TEST_GROUP_NAME,
            procedure,
            TEST_TOPIC_2_DLQ_TOPIC_NAME,
            ImmutableList.of(TEST_RETRY_TOPIC_NAME)));
    UForwarderUtils.createJob(
        String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
        getAddJobGroupRequest(
            TEST_TOPIC_2_DLQ_TOPIC_NAME,
            TEST_GROUP_NAME,
            procedure,
            TEST_TOPIC_2_DLQ_TOPIC_NAME,
            ImmutableList.of(TEST_RETRY_TOPIC_NAME)));

    await()
        .atMost(240, TimeUnit.SECONDS)
        .untilAsserted(
            () -> {
              Assert.assertTrue(
                  MockConsumerServiceStarter.NackingTestKafkaConsumerHandler.INVOKE_COUNT.get()
                      // The test handler should receive 3 copies of messages from
                      // original queue, retry queue, and DLQ
                      >= NUMBER_OF_MESSAGES * 3);
            });
  }