public long getOffsetToCommit()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/KafkaCheckpointManager.java [36:93]


  public long getOffsetToCommit(Job job) {
    preConditionCheck(job);

    TopicPartition topicPartition =
        new TopicPartition(
            job.getKafkaConsumerTask().getTopic(), job.getKafkaConsumerTask().getPartition());
    final Scope scopeWithGroupTopicPartition =
        scope.tagged(
            StructuredTags.builder()
                .setKafkaGroup(job.getKafkaConsumerTask().getConsumerGroup())
                .setKafkaTopic(topicPartition.topic())
                .setKafkaPartition(topicPartition.partition())
                .build());
    CheckpointInfo checkpointInfo = checkpointInfoMap.get(topicPartition);
    if (checkpointInfo == null) {
      LOGGER.warn(
          "skip offset commit",
          StructuredLogging.jobId(job.getJobId()),
          StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
          StructuredLogging.kafkaTopic(topicPartition.topic()),
          StructuredLogging.kafkaPartition(topicPartition.partition()),
          StructuredLogging.reason("no checkpoint info"));
      scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_SKIP).inc(1);
    } else if (checkpointInfo.getOffsetToCommit() <= KafkaUtils.MAX_INVALID_OFFSET_TO_COMMIT) {
      LOGGER.debug(
          "skip offset commit",
          StructuredLogging.jobId(job.getJobId()),
          StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
          StructuredLogging.kafkaTopic(topicPartition.topic()),
          StructuredLogging.kafkaPartition(topicPartition.partition()),
          StructuredLogging.reason("checkpoint info = -1"));
      scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_SKIP).inc(1);
    } else if (checkpointInfo.getOffsetToCommit() < checkpointInfo.getCommittedOffset()) {
      LOGGER.debug(
          "skip offset commit",
          StructuredLogging.jobId(job.getJobId()),
          StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
          StructuredLogging.kafkaTopic(topicPartition.topic()),
          StructuredLogging.kafkaPartition(topicPartition.partition()),
          StructuredLogging.kafkaOffset(checkpointInfo.getOffsetToCommit()),
          StructuredLogging.reason("offset already committed"));
      scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_SKIP).inc(1);
    } else if (checkpointInfo.getOffsetToCommit() == checkpointInfo.getCommittedOffset()) {
      LOGGER.debug(
          "commit offset already exist",
          StructuredLogging.jobId(job.getJobId()),
          StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
          StructuredLogging.kafkaTopic(topicPartition.topic()),
          StructuredLogging.kafkaPartition(topicPartition.partition()),
          StructuredLogging.kafkaOffset(checkpointInfo.getOffsetToCommit()),
          StructuredLogging.reason("offset already committed"));
      scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_DUPLICATION).inc(1);
      return checkpointInfo.getOffsetToCommit();
    } else {
      return checkpointInfo.getOffsetToCommit();
    }
    return KafkaUtils.MAX_INVALID_OFFSET_TO_COMMIT;
  }