in uforwarder/src/integrationTest/java/com/uber/data/kafka/consumerproxy/UforwarderIntegrationTests.java [246:328]
public void testKafkaConsumerProxyEnd2EndFlowWithTieredRQ() throws Exception {
String procedure =
String.format(
MockConsumerServiceStarter.METHOD_NAME_FORMAT, TEST_GROUP_NAME, TEST_TOPIC_3_NAME);
prepareTopic(TEST_TOPIC_3_NAME, kafkaServer.getBootstrapServers());
prepareTopic(TEST_TIER_1_RETRY_TOPIC_NAME, kafkaServer.getBootstrapServers());
prepareTopic(TEST_TIER_2_RETRY_TOPIC_NAME, kafkaServer.getBootstrapServers());
prepareTopic(TEST_TOPIC_3_DLQ_TOPIC_NAME, kafkaServer.getBootstrapServers());
sendKafkaMessages(
TEST_TOPIC_3_NAME, kafkaServer.getBootstrapServers(), NUMBER_OF_MESSAGES, false);
UForwarderUtils.createJob(
String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
getAddJobGroupRequest(
TEST_TOPIC_3_NAME,
TEST_GROUP_NAME,
procedure,
TEST_TOPIC_3_DLQ_TOPIC_NAME,
ImmutableList.of(TEST_TIER_1_RETRY_TOPIC_NAME, TEST_TIER_2_RETRY_TOPIC_NAME)));
UForwarderUtils.createJob(
String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
getAddJobGroupRequest(
TEST_TIER_1_RETRY_TOPIC_NAME,
TEST_GROUP_NAME,
procedure,
TEST_TOPIC_3_DLQ_TOPIC_NAME,
ImmutableList.of(TEST_TIER_1_RETRY_TOPIC_NAME, TEST_TIER_2_RETRY_TOPIC_NAME)));
UForwarderUtils.createJob(
String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
getAddJobGroupRequest(
TEST_TIER_2_RETRY_TOPIC_NAME,
TEST_GROUP_NAME,
procedure,
TEST_TOPIC_3_DLQ_TOPIC_NAME,
ImmutableList.of(TEST_TIER_1_RETRY_TOPIC_NAME, TEST_TIER_2_RETRY_TOPIC_NAME)));
UForwarderUtils.createJob(
String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
getAddJobGroupRequest(
TEST_TOPIC_3_DLQ_TOPIC_NAME,
TEST_GROUP_NAME,
procedure,
TEST_TOPIC_3_DLQ_TOPIC_NAME,
ImmutableList.of(TEST_TIER_1_RETRY_TOPIC_NAME, TEST_TIER_2_RETRY_TOPIC_NAME)));
await()
.atMost(240, TimeUnit.SECONDS)
.untilAsserted(
() -> {
// The test handler should receive 6 copies of messages from
// original queue, T1 retry queue(x2), T2 retry queue(x2), and DLQ
Assert.assertTrue(
MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler.INVOKE_COUNT.get()
>= NUMBER_OF_MESSAGES * 6);
for (long i = 0L; i < 6; i++) {
Assert.assertTrue(
MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler
.RETRY_COUNT_TO_INVOKE_COUNT_MAP
.get(i)
.get()
>= NUMBER_OF_MESSAGES);
}
// T1 retry queue
Assert.assertTrue(
MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler
.PHYSICAL_SOURCE_TO_INVOKE_COUNT_MAP
.get(TEST_TIER_1_RETRY_TOPIC_NAME)
.get()
>= NUMBER_OF_MESSAGES * 2);
// T2 retry queue
Assert.assertTrue(
MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler
.PHYSICAL_SOURCE_TO_INVOKE_COUNT_MAP
.get(TEST_TIER_2_RETRY_TOPIC_NAME)
.get()
>= NUMBER_OF_MESSAGES * 2);
// DLQ
Assert.assertTrue(
MockConsumerServiceStarter.RetryingTestKafkaConsumerHandler
.PHYSICAL_SOURCE_TO_INVOKE_COUNT_MAP
.get(TEST_TOPIC_3_DLQ_TOPIC_NAME)
.get()
>= NUMBER_OF_MESSAGES);
});
}