Map addToCheckPointManager()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [737:790]


  Map<TopicPartition, Long> addToCheckPointManager(
      Map<TopicPartition, Job> addedTopicPartitionJobMap,
      Map<TopicPartition, OffsetAndMetadata> brokerCommittedOffset) {
    Map<TopicPartition, Long> startingOffsetMap = new HashMap<>();
    if (!addedTopicPartitionJobMap.isEmpty()) {
      for (Map.Entry<TopicPartition, Job> entry : addedTopicPartitionJobMap.entrySet()) {
        CheckpointInfo checkpointInfo = checkpointManager.addCheckpointInfo(entry.getValue());
        // if the offset to commit is not set, we set it to the broker committed offset
        if (!checkpointInfo.isCommitOffsetExists()
            && brokerCommittedOffset != null
            && brokerCommittedOffset.containsKey(entry.getKey())) {
          OffsetAndMetadata brokerOffset = brokerCommittedOffset.get(entry.getKey());
          // OffsetAndMetadata returned by kafka could be null
          if (brokerOffset == null) {
            continue;
          }
          checkpointInfo.setCommittedOffset(brokerOffset.offset());
          checkpointInfo.setOffsetToCommit(brokerOffset.offset());
          LOGGER.info(
              "set committed offset to broker committed offset",
              StructuredLogging.kafkaGroup(
                  entry.getValue().getKafkaConsumerTask().getConsumerGroup()),
              StructuredLogging.kafkaTopic(entry.getValue().getKafkaConsumerTask().getTopic()),
              StructuredLogging.kafkaPartition(
                  entry.getValue().getKafkaConsumerTask().getPartition()),
              StructuredLogging.kafkaOffset(brokerOffset.offset()));
        }

        KafkaConsumerTask config = entry.getValue().getKafkaConsumerTask();
        long startingOffset = config.getStartOffset();
        // only seek start offset when it's a valid offset
        if (startingOffset >= 0) {
          startingOffsetMap.put(entry.getKey(), startingOffset);
        }
        if (startingOffset >= 0 || config.getEndOffset() > 0) {
          KafkaConsumerTask task = entry.getValue().getKafkaConsumerTask();
          Scope scopeWithGroupTopicPartition =
              scope.tagged(
                  StructuredTags.builder()
                      .setKafkaGroup(task.getConsumerGroup())
                      .setKafkaTopic(task.getTopic())
                      .setKafkaPartition(task.getPartition())
                      .build());
          scopeWithGroupTopicPartition
              .gauge(MetricNames.TOPIC_PARTITION_OFFSET_START)
              .update(config.getStartOffset());
          scopeWithGroupTopicPartition
              .gauge(MetricNames.TOPIC_PARTITION_OFFSET_END)
              .update(config.getEndOffset());
        }
      }
    }
    return startingOffsetMap;
  }