in uforwarder/src/integrationTest/java/com/uber/data/kafka/consumerproxy/UforwarderIntegrationTests.java [352:403]
private AddJobGroupRequest getAddJobGroupRequest(
String topicName,
String consumerGroupName,
String procedureName,
String dlqTopicName,
List<String> retryTopicNames) {
RetryConfig.Builder retryConfigBuilder = RetryConfig.newBuilder();
if (!retryTopicNames.isEmpty()) {
retryConfigBuilder.setRetryEnabled(true);
retryTopicNames.forEach(
r ->
retryConfigBuilder.addRetryQueues(
RetryQueue.newBuilder()
.setProcessingDelayMs(0)
.setMaxRetryCount(2)
.setRetryCluster(TEST_CLUSTER_NAME)
.setRetryQueueTopic(r)
.build()));
}
return AddJobGroupRequest.newBuilder()
.setJobGroup(
JobGroup.newBuilder()
.setType(JobType.JOB_TYPE_KAFKA_CONSUMER_TO_RPC_DISPATCHER)
.setJobGroupId(
String.join(
JOB_GROUP_ID_DELIMITER, topicName, TEST_CLUSTER_NAME, consumerGroupName))
.setKafkaConsumerTaskGroup(
KafkaConsumerTaskGroup.newBuilder()
.setCluster(TEST_CLUSTER_NAME)
.setTopic(topicName)
.setConsumerGroup(consumerGroupName)
.build())
.setRpcDispatcherTaskGroup(
RpcDispatcherTaskGroup.newBuilder()
.setUri("dns:///localhost:" + mockConsumerServer.getPort())
.setRpcTimeoutMs(1000)
.setProcedure(procedureName)
.setDlqCluster(TEST_CLUSTER_NAME)
.setDlqTopic(dlqTopicName)
.build())
.setFlowControl(
FlowControl.newBuilder()
.setBytesPerSec(10)
.setMaxInflightMessages(10)
.setMessagesPerSec(10)
.build())
.setRetryConfig(retryConfigBuilder)
.build())
.setJobGroupState(JobState.JOB_STATE_RUNNING)
.build();
}