public void doWork()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [236:369]


  public void doWork() {
    Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
    Preconditions.checkNotNull(processorSink, "processor sink required");
    if (!processorSink.isRunning()) {
      initiateShutdown();
      cleanup();
      return;
    }
    scope
        .tagged(
            StructuredTags.builder()
                .setKafkaCluster(
                    pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getCluster())
                .setKafkaGroup(
                    pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getConsumerGroup())
                .setKafkaTopic(
                    pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getTopic())
                .build())
        .gauge(MetricNames.FETCHER_KAFKA_WORKING)
        .update(1.0);
    // step 1: discover topic partition changes
    Map<TopicPartition, Job> addedTopicPartitionJobMap = new HashMap<>();
    Map<TopicPartition, Job> removedTopicPartitionJobMap = new HashMap<>();
    Map<TopicPartition, Job> allTopicPartitionJobMap = new HashMap<>();
    // the extractTopicPartitionMap method will modify three passed-in maps, therefore, do not make
    // those maps immutable.
    boolean assignmentChange =
        extractTopicPartitionMap(
            addedTopicPartitionJobMap, removedTopicPartitionJobMap, allTopicPartitionJobMap);

    // step 2: handle newly-added topic partitions
    // specifically, (1) the checkpoint manager starts to track them (2) extracts topic partitions
    // with non-negative start offset for further processing.
    // (3) fetch committed offset from broker and put it into checkpoint manager
    Map<TopicPartition, OffsetAndMetadata> brokerCommittedOffset = new HashMap<>();
    if (!addedTopicPartitionJobMap.isEmpty()) {
      brokerCommittedOffset = getBrokerCommittedOffset(addedTopicPartitionJobMap.keySet());
    }
    Map<TopicPartition, Long> topicPartitionOffsetMap =
        addToCheckPointManager(addedTopicPartitionJobMap, brokerCommittedOffset);

    // step 2.1 track throughput and inflightMessage of newly added topic partitions and the removed
    // topic partitions
    adjustTracker(addedTopicPartitionJobMap, removedTopicPartitionJobMap);

    // use kafkaConsumerLock to ensure threadsafe access for kafkaConsumer and the in-order enqueue
    // for ConsumerRecords
    try {
      // step 3: performs topic partition addition and deletion
      // step 3.1: update assignment
      if (assignmentChange) {
        LOGGER.info(
            "assignment changed",
            StructuredLogging.kafkaTopicPartitions(allTopicPartitionJobMap.keySet()));
        kafkaConsumer.assign(allTopicPartitionJobMap.keySet());

        // delete removed jobs from delayProcessManager
        delayProcessManager.delete(removedTopicPartitionJobMap.keySet());

        // Resume all topic partitions in case they are paused before.
        // The partitions paused by delayProcessManager will not be resumed here, but will be
        // resumed in DelayProcessManager.
        // If we don't resume and if the previously paused topic partitions are assigned to
        // this worker again, those topic partitions will still be in paused state. This might be
        // a problem because we cannot fetch messages from this specific topic partitions.
        // topic partitions paused by delayProcessManager will be resumed in resumeTopicPartitions
        // function.
        List<TopicPartition> pausedTopicPartitions = delayProcessManager.getAll();
        Collection<TopicPartition> topicPartitions =
            allTopicPartitionJobMap
                .keySet()
                .stream()
                .filter(tp -> !pausedTopicPartitions.contains(tp))
                .collect(Collectors.toSet());
        kafkaConsumer.resume(topicPartitions);
      }

      // step 3.2: seek start offsets when it's necessary
      seekStartOffset(topicPartitionOffsetMap, allTopicPartitionJobMap);

      // step 4: commits offsets
      commitOffsets(allTopicPartitionJobMap);

      // step 5: poll messages
      @Nullable ConsumerRecords<K, V> records = pollMessages(allTopicPartitionJobMap);

      // step 6: get resumed topic partitions
      Map<TopicPartition, List<ConsumerRecord<K, V>>> resumedRecords =
          delayProcessManager.resumePausedPartitionsAndRecords();

      // step 7: merge resumed records with new polled records
      @Nullable ConsumerRecords<K, V> mergedRecords = mergeRecords(records, resumedRecords);

      // step 8: update actual running job status here because we use async commit, so offsets are
      // committed to Kafka servers when we are doing polling.
      updateActualRunningJobStatus();

      // step 9: process messages
      processFetchedData(mergedRecords, allTopicPartitionJobMap);

      // step 10: log and report metric
      logTopicPartitionOffsetInfo(allTopicPartitionJobMap);
    } catch (Throwable e) {
      // TODO (T4575853): recreate kafka consumer if kafka consumer is closed.
      scope
          .tagged(
              StructuredTags.builder()
                  .setKafkaCluster(
                      pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getCluster())
                  .setKafkaGroup(
                      pipelineStateManager
                          .getJobTemplate()
                          .getKafkaConsumerTask()
                          .getConsumerGroup())
                  .setKafkaTopic(
                      pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getTopic())
                  .setError(e)
                  .build())
          .gauge(MetricNames.FETCHER_KAFKA_NOT_WORKING)
          .update(1.0);
      // don't log authorization exception because it will be in large volume
      if (!(e instanceof TopicAuthorizationException)) {
        LOGGER.error("failed to doWork", e);
      }
      // reset offset to fetchOffset to avoid data loss
      for (Map.Entry<TopicPartition, Job> entry : allTopicPartitionJobMap.entrySet()) {
        if (checkpointManager.getCheckpointInfo(entry.getValue()).getFetchOffset() >= 0) {
          kafkaConsumer.seek(
              entry.getKey(),
              checkpointManager.getCheckpointInfo(entry.getValue()).getFetchOffset());
        }
      }
    }
  }