Map addToCheckPointManager()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [619:650]


  Map<TopicPartition, Long> addToCheckPointManager(
      Map<TopicPartition, Job> addedTopicPartitionJobMap) {
    Map<TopicPartition, Long> startingOffsetMap = new HashMap<>();
    if (!addedTopicPartitionJobMap.isEmpty()) {
      for (Map.Entry<TopicPartition, Job> entry : addedTopicPartitionJobMap.entrySet()) {
        checkpointManager.addCheckpointInfo(entry.getValue());
        KafkaConsumerTask config = entry.getValue().getKafkaConsumerTask();
        long startingOffset = config.getStartOffset();
        // only seek start offset when it's a valid offset
        if (startingOffset >= 0) {
          startingOffsetMap.put(entry.getKey(), startingOffset);
        }
        if (startingOffset >= 0 || config.getEndOffset() > 0) {
          KafkaConsumerTask task = entry.getValue().getKafkaConsumerTask();
          Scope scopeWithGroupTopicPartition =
              scope.tagged(
                  StructuredTags.builder()
                      .setKafkaGroup(task.getConsumerGroup())
                      .setKafkaTopic(task.getTopic())
                      .setKafkaPartition(task.getPartition())
                      .build());
          scopeWithGroupTopicPartition
              .gauge(MetricNames.TOPIC_PARTITION_OFFSET_START)
              .update(config.getStartOffset());
          scopeWithGroupTopicPartition
              .gauge(MetricNames.TOPIC_PARTITION_OFFSET_END)
              .update(config.getEndOffset());
        }
      }
    }
    return startingOffsetMap;
  }