in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/fetcher/KafkaFetcherFactory.java [47:111]
public KafkaFetcher<byte[], byte[]> create(Job jobTemplate, String threadId, CoreInfra coreInfra)
throws Exception {
Scope fetcherScope =
coreInfra
.scope()
.tagged(
StructuredTags.builder()
.setKafkaGroup(jobTemplate.getKafkaConsumerTask().getConsumerGroup())
.setKafkaTopic(jobTemplate.getKafkaConsumerTask().getTopic())
.setKafkaPartition(jobTemplate.getKafkaConsumerTask().getPartition())
.build());
Stopwatch fetcherCreateTimer = fetcherScope.timer(MetricNames.CREATION_LATENCY).start();
String topic = jobTemplate.getKafkaConsumerTask().getTopic();
ClusterAndIsSecureKey clusterAndSecureKey = ClusterAndIsSecureKey.of(jobTemplate);
if (brokerConnectionStringMap.asMap().containsKey(clusterAndSecureKey)) {
fetcherScope.counter(MetricNames.RESOLVER_CACHE_HIT).inc(1);
}
String bootstrapServers = brokerConnectionStringMap.get(clusterAndSecureKey);
String consumerGroup = jobTemplate.getKafkaConsumerTask().getConsumerGroup();
IsolationLevel isolationLevel = jobTemplate.getKafkaConsumerTask().getIsolationLevel();
AutoOffsetResetPolicy autoOffsetResetPolicy =
jobTemplate.getKafkaConsumerTask().getAutoOffsetResetPolicy();
boolean isSecure =
jobTemplate.hasSecurityConfig() && jobTemplate.getSecurityConfig().getIsSecure();
Optional<RetryQueue> retryQueue = RetryUtils.findRetryQueueWithTopicName(jobTemplate, topic);
AbstractKafkaFetcherThread<byte[], byte[]> kafkaFetcher = null;
// TODO: This is not extensible way of encoding this. The FetcherFactory should make a decision
// on which type of fetcher to use only using information available in the KafkaConsumerTask.
// Allow both normal retry as well as tiered to get instance of RetryTopicKafkaFetcher till
// kcpserver changes are made to populate RetryConfig in Job
if (topic.equals(jobTemplate.getRpcDispatcherTask().getRetryQueueTopic())
|| retryQueue.isPresent()) {
kafkaFetcher =
RetryTopicKafkaFetcher.of(
threadId,
bootstrapServers,
consumerGroup,
autoOffsetResetPolicy,
config,
retryQueue,
isSecure,
coreInfra);
} else if (topic.equals(jobTemplate.getRpcDispatcherTask().getDlqTopic())) {
kafkaFetcher =
DlqTopicKafkaFetcher.of(
threadId, bootstrapServers, consumerGroup, config, isSecure, coreInfra);
} else {
kafkaFetcher =
OriginalTopicKafkaFetcher.of(
threadId,
bootstrapServers,
consumerGroup,
autoOffsetResetPolicy,
isolationLevel,
jobTemplate.getKafkaConsumerTask().getProcessingDelayMs(),
config,
isSecure,
coreInfra);
}
KafkaFetcher<byte[], byte[]> fetcher = new KafkaFetcher<>(kafkaFetcher);
fetcherCreateTimer.stop();
return fetcher;
}