in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/KafkaCheckpointManager.java [36:93]
public long getOffsetToCommit(Job job) {
preConditionCheck(job);
TopicPartition topicPartition =
new TopicPartition(
job.getKafkaConsumerTask().getTopic(), job.getKafkaConsumerTask().getPartition());
final Scope scopeWithGroupTopicPartition =
scope.tagged(
StructuredTags.builder()
.setKafkaGroup(job.getKafkaConsumerTask().getConsumerGroup())
.setKafkaTopic(topicPartition.topic())
.setKafkaPartition(topicPartition.partition())
.build());
CheckpointInfo checkpointInfo = checkpointInfoMap.get(topicPartition);
if (checkpointInfo == null) {
LOGGER.warn(
"skip offset commit",
StructuredLogging.jobId(job.getJobId()),
StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
StructuredLogging.kafkaTopic(topicPartition.topic()),
StructuredLogging.kafkaPartition(topicPartition.partition()),
StructuredLogging.reason("no checkpoint info"));
scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_SKIP).inc(1);
} else if (checkpointInfo.getOffsetToCommit() <= KafkaUtils.MAX_INVALID_OFFSET_TO_COMMIT) {
LOGGER.debug(
"skip offset commit",
StructuredLogging.jobId(job.getJobId()),
StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
StructuredLogging.kafkaTopic(topicPartition.topic()),
StructuredLogging.kafkaPartition(topicPartition.partition()),
StructuredLogging.reason("checkpoint info = -1"));
scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_SKIP).inc(1);
} else if (checkpointInfo.getOffsetToCommit() < checkpointInfo.getCommittedOffset()) {
LOGGER.debug(
"skip offset commit",
StructuredLogging.jobId(job.getJobId()),
StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
StructuredLogging.kafkaTopic(topicPartition.topic()),
StructuredLogging.kafkaPartition(topicPartition.partition()),
StructuredLogging.kafkaOffset(checkpointInfo.getOffsetToCommit()),
StructuredLogging.reason("offset already committed"));
scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_SKIP).inc(1);
} else if (checkpointInfo.getOffsetToCommit() == checkpointInfo.getCommittedOffset()) {
LOGGER.debug(
"commit offset already exist",
StructuredLogging.jobId(job.getJobId()),
StructuredLogging.kafkaGroup(job.getKafkaConsumerTask().getConsumerGroup()),
StructuredLogging.kafkaTopic(topicPartition.topic()),
StructuredLogging.kafkaPartition(topicPartition.partition()),
StructuredLogging.kafkaOffset(checkpointInfo.getOffsetToCommit()),
StructuredLogging.reason("offset already committed"));
scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_DUPLICATION).inc(1);
return checkpointInfo.getOffsetToCommit();
} else {
return checkpointInfo.getOffsetToCommit();
}
return KafkaUtils.MAX_INVALID_OFFSET_TO_COMMIT;
}