in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [737:790]
Map<TopicPartition, Long> addToCheckPointManager(
Map<TopicPartition, Job> addedTopicPartitionJobMap,
Map<TopicPartition, OffsetAndMetadata> brokerCommittedOffset) {
Map<TopicPartition, Long> startingOffsetMap = new HashMap<>();
if (!addedTopicPartitionJobMap.isEmpty()) {
for (Map.Entry<TopicPartition, Job> entry : addedTopicPartitionJobMap.entrySet()) {
CheckpointInfo checkpointInfo = checkpointManager.addCheckpointInfo(entry.getValue());
// if the offset to commit is not set, we set it to the broker committed offset
if (!checkpointInfo.isCommitOffsetExists()
&& brokerCommittedOffset != null
&& brokerCommittedOffset.containsKey(entry.getKey())) {
OffsetAndMetadata brokerOffset = brokerCommittedOffset.get(entry.getKey());
// OffsetAndMetadata returned by kafka could be null
if (brokerOffset == null) {
continue;
}
checkpointInfo.setCommittedOffset(brokerOffset.offset());
checkpointInfo.setOffsetToCommit(brokerOffset.offset());
LOGGER.info(
"set committed offset to broker committed offset",
StructuredLogging.kafkaGroup(
entry.getValue().getKafkaConsumerTask().getConsumerGroup()),
StructuredLogging.kafkaTopic(entry.getValue().getKafkaConsumerTask().getTopic()),
StructuredLogging.kafkaPartition(
entry.getValue().getKafkaConsumerTask().getPartition()),
StructuredLogging.kafkaOffset(brokerOffset.offset()));
}
KafkaConsumerTask config = entry.getValue().getKafkaConsumerTask();
long startingOffset = config.getStartOffset();
// only seek start offset when it's a valid offset
if (startingOffset >= 0) {
startingOffsetMap.put(entry.getKey(), startingOffset);
}
if (startingOffset >= 0 || config.getEndOffset() > 0) {
KafkaConsumerTask task = entry.getValue().getKafkaConsumerTask();
Scope scopeWithGroupTopicPartition =
scope.tagged(
StructuredTags.builder()
.setKafkaGroup(task.getConsumerGroup())
.setKafkaTopic(task.getTopic())
.setKafkaPartition(task.getPartition())
.build());
scopeWithGroupTopicPartition
.gauge(MetricNames.TOPIC_PARTITION_OFFSET_START)
.update(config.getStartOffset());
scopeWithGroupTopicPartition
.gauge(MetricNames.TOPIC_PARTITION_OFFSET_END)
.update(config.getEndOffset());
}
}
}
return startingOffsetMap;
}