in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerConnector.java [208:224]
private void resetOffsets(Consumer<byte[]> consumer, SubscriptionInitialPosition strategy) {
if (strategy == null) {
return;
}
log.info("Resetting partition {} for group-id {} and seeking to {} position", consumer.getTopic(),
consumer.getSubscription(), strategy);
try {
if (strategy == SubscriptionInitialPosition.Earliest) {
consumer.seek(MessageId.earliest);
} else {
consumer.seek(MessageId.latest);
}
} catch (PulsarClientException e) {
log.warn("Failed to reset offset for consumer {} to {}, {}", consumer.getTopic(), strategy,
e.getMessage(), e);
}
}