in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [619:650]
Map<TopicPartition, Long> addToCheckPointManager(
Map<TopicPartition, Job> addedTopicPartitionJobMap) {
Map<TopicPartition, Long> startingOffsetMap = new HashMap<>();
if (!addedTopicPartitionJobMap.isEmpty()) {
for (Map.Entry<TopicPartition, Job> entry : addedTopicPartitionJobMap.entrySet()) {
checkpointManager.addCheckpointInfo(entry.getValue());
KafkaConsumerTask config = entry.getValue().getKafkaConsumerTask();
long startingOffset = config.getStartOffset();
// only seek start offset when it's a valid offset
if (startingOffset >= 0) {
startingOffsetMap.put(entry.getKey(), startingOffset);
}
if (startingOffset >= 0 || config.getEndOffset() > 0) {
KafkaConsumerTask task = entry.getValue().getKafkaConsumerTask();
Scope scopeWithGroupTopicPartition =
scope.tagged(
StructuredTags.builder()
.setKafkaGroup(task.getConsumerGroup())
.setKafkaTopic(task.getTopic())
.setKafkaPartition(task.getPartition())
.build());
scopeWithGroupTopicPartition
.gauge(MetricNames.TOPIC_PARTITION_OFFSET_START)
.update(config.getStartOffset());
scopeWithGroupTopicPartition
.gauge(MetricNames.TOPIC_PARTITION_OFFSET_END)
.update(config.getEndOffset());
}
}
}
return startingOffsetMap;
}