in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [793:898]
public void seekStartOffset(
Map<TopicPartition, Long> seekOffsetTaskMap, Map<TopicPartition, Job> topicPartitionJobMap) {
Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
if (seekOffsetTaskMap.isEmpty()) {
return;
}
LOGGER.info(
"seek start offsets",
StructuredLogging.kafkaTopicPartitions(topicPartitionJobMap.keySet()));
Map<TopicPartition, Long> beginningOffsets =
kafkaConsumer.beginningOffsets(seekOffsetTaskMap.keySet());
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(seekOffsetTaskMap.keySet());
for (Map.Entry<TopicPartition, Long> entry : seekOffsetTaskMap.entrySet()) {
TopicPartition tp = entry.getKey();
long offset = entry.getValue();
if (topicPartitionJobMap.get(tp) == null) {
continue;
}
if (offset >= 0) {
Long earliestOffset = beginningOffsets.get(tp);
Long latestOffset = endOffsets.get(tp);
String consumerGroup =
pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getConsumerGroup();
// log the offset info
if (earliestOffset == null) {
logStartOffsetInfo(
consumerGroup,
tp.topic(),
tp.partition(),
offset,
MetricNames.TOPIC_PARTITION_SEEK_START_NULL);
} else if (earliestOffset > offset) {
logStartOffsetInfo(
consumerGroup,
tp.topic(),
tp.partition(),
offset,
MetricNames.TOPIC_PARTITION_SEEK_TOO_SMALL);
}
if (latestOffset == null) {
logStartOffsetInfo(
consumerGroup,
tp.topic(),
tp.partition(),
offset,
MetricNames.TOPIC_PARTITION_SEEK_END_NULL);
} else if (latestOffset < offset) {
logStartOffsetInfo(
consumerGroup,
tp.topic(),
tp.partition(),
offset,
MetricNames.TOPIC_PARTITION_SEEK_TOO_LARGE);
}
switch (getSeekStartOffsetOption(
offset,
earliestOffset,
latestOffset,
topicPartitionJobMap.get(tp).getKafkaConsumerTask().getAutoOffsetResetPolicy())) {
case SEEK_TO_SPECIFIED_OFFSET:
kafkaConsumer.seek(tp, offset);
checkpointManager.setFetchOffset(topicPartitionJobMap.get(tp), offset);
break;
case SEEK_TO_EARLIEST_OFFSET:
kafkaConsumer.seekToBeginning(Collections.singleton(tp));
if (earliestOffset != null) {
// it should be okay if we don't change the fetch offset, as it will eventually be
// override when messages are being processed.
checkpointManager.setFetchOffset(topicPartitionJobMap.get(tp), earliestOffset);
}
break;
case SEEK_TO_LATEST_OFFSET:
kafkaConsumer.seekToEnd(Collections.singleton(tp));
if (latestOffset != null) {
// it should be okay if we don't change the fetch offset, as it will eventually be
// override when messages are being processed.
checkpointManager.setFetchOffset(topicPartitionJobMap.get(tp), latestOffset);
}
break;
case DO_NOT_SEEK:
// do not seek means going to the committed offset.
// although we don't need to seek, we still need to set the offsets.
OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
if (committedOffset != null) {
checkpointManager.setCommittedOffset(
topicPartitionJobMap.get(tp), committedOffset.offset());
checkpointManager.setFetchOffset(
topicPartitionJobMap.get(tp), committedOffset.offset());
} else {
handleNoCommittedOffsetWhenDoNotSeekPolicy(
kafkaConsumer, checkpointManager, topicPartitionJobMap.get(tp), tp);
}
break;
default:
// do nothing
}
LOGGER.debug(
"seek kafka consumer",
StructuredLogging.kafkaGroup(
pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getConsumerGroup()),
StructuredLogging.kafkaTopic(tp.topic()),
StructuredLogging.kafkaPartition(tp.partition()),
StructuredLogging.kafkaOffset(offset));
}
}
}