in uforwarder/src/integrationTest/java/com/uber/data/kafka/consumerproxy/UforwarderIntegrationTests.java [190:234]
public void testKafkaConsumerProxyEnd2EndFlowWithRQAndDLQ() throws Exception {
String procedure =
String.format(
MockConsumerServiceStarter.METHOD_NAME_FORMAT, TEST_GROUP_NAME, TEST_TOPIC_2_NAME);
prepareTopic(TEST_TOPIC_2_NAME, kafkaServer.getBootstrapServers());
prepareTopic(TEST_RETRY_TOPIC_NAME, kafkaServer.getBootstrapServers());
prepareTopic(TEST_TOPIC_2_DLQ_TOPIC_NAME, kafkaServer.getBootstrapServers());
sendKafkaMessages(
TEST_TOPIC_2_NAME, kafkaServer.getBootstrapServers(), NUMBER_OF_MESSAGES, false);
UForwarderUtils.createJob(
String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
getAddJobGroupRequest(
TEST_TOPIC_2_NAME,
TEST_GROUP_NAME,
procedure,
TEST_TOPIC_2_DLQ_TOPIC_NAME,
ImmutableList.of(TEST_RETRY_TOPIC_NAME)));
UForwarderUtils.createJob(
String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
getAddJobGroupRequest(
TEST_RETRY_TOPIC_NAME,
TEST_GROUP_NAME,
procedure,
TEST_TOPIC_2_DLQ_TOPIC_NAME,
ImmutableList.of(TEST_RETRY_TOPIC_NAME)));
UForwarderUtils.createJob(
String.format("%s:%s", Constants.MASTER_HOST, controllerGrpcPort),
getAddJobGroupRequest(
TEST_TOPIC_2_DLQ_TOPIC_NAME,
TEST_GROUP_NAME,
procedure,
TEST_TOPIC_2_DLQ_TOPIC_NAME,
ImmutableList.of(TEST_RETRY_TOPIC_NAME)));
await()
.atMost(240, TimeUnit.SECONDS)
.untilAsserted(
() -> {
Assert.assertTrue(
MockConsumerServiceStarter.NackingTestKafkaConsumerHandler.INVOKE_COUNT.get()
// The test handler should receive 3 copies of messages from
// original queue, retry queue, and DLQ
>= NUMBER_OF_MESSAGES * 3);
});
}