public KafkaFetcher create()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/fetcher/KafkaFetcherFactory.java [47:111]


  public KafkaFetcher<byte[], byte[]> create(Job jobTemplate, String threadId, CoreInfra coreInfra)
      throws Exception {
    Scope fetcherScope =
        coreInfra
            .scope()
            .tagged(
                StructuredTags.builder()
                    .setKafkaGroup(jobTemplate.getKafkaConsumerTask().getConsumerGroup())
                    .setKafkaTopic(jobTemplate.getKafkaConsumerTask().getTopic())
                    .setKafkaPartition(jobTemplate.getKafkaConsumerTask().getPartition())
                    .build());
    Stopwatch fetcherCreateTimer = fetcherScope.timer(MetricNames.CREATION_LATENCY).start();
    String topic = jobTemplate.getKafkaConsumerTask().getTopic();
    ClusterAndIsSecureKey clusterAndSecureKey = ClusterAndIsSecureKey.of(jobTemplate);
    if (brokerConnectionStringMap.asMap().containsKey(clusterAndSecureKey)) {
      fetcherScope.counter(MetricNames.RESOLVER_CACHE_HIT).inc(1);
    }
    String bootstrapServers = brokerConnectionStringMap.get(clusterAndSecureKey);
    String consumerGroup = jobTemplate.getKafkaConsumerTask().getConsumerGroup();
    IsolationLevel isolationLevel = jobTemplate.getKafkaConsumerTask().getIsolationLevel();
    AutoOffsetResetPolicy autoOffsetResetPolicy =
        jobTemplate.getKafkaConsumerTask().getAutoOffsetResetPolicy();
    boolean isSecure =
        jobTemplate.hasSecurityConfig() && jobTemplate.getSecurityConfig().getIsSecure();

    Optional<RetryQueue> retryQueue = RetryUtils.findRetryQueueWithTopicName(jobTemplate, topic);

    AbstractKafkaFetcherThread<byte[], byte[]> kafkaFetcher = null;
    // TODO: This is not extensible way of encoding this. The FetcherFactory should make a decision
    // on which type of fetcher to use only using information available in the KafkaConsumerTask.
    // Allow both normal retry as well as tiered to get instance of RetryTopicKafkaFetcher till
    // kcpserver changes are made to populate RetryConfig in Job
    if (topic.equals(jobTemplate.getRpcDispatcherTask().getRetryQueueTopic())
        || retryQueue.isPresent()) {
      kafkaFetcher =
          RetryTopicKafkaFetcher.of(
              threadId,
              bootstrapServers,
              consumerGroup,
              autoOffsetResetPolicy,
              config,
              retryQueue,
              isSecure,
              coreInfra);
    } else if (topic.equals(jobTemplate.getRpcDispatcherTask().getDlqTopic())) {
      kafkaFetcher =
          DlqTopicKafkaFetcher.of(
              threadId, bootstrapServers, consumerGroup, config, isSecure, coreInfra);
    } else {
      kafkaFetcher =
          OriginalTopicKafkaFetcher.of(
              threadId,
              bootstrapServers,
              consumerGroup,
              autoOffsetResetPolicy,
              isolationLevel,
              jobTemplate.getKafkaConsumerTask().getProcessingDelayMs(),
              config,
              isSecure,
              coreInfra);
    }
    KafkaFetcher<byte[], byte[]> fetcher = new KafkaFetcher<>(kafkaFetcher);
    fetcherCreateTimer.stop();
    return fetcher;
  }