in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java [124:152]
private static void verifyInitialOffset(
TopicPartition tp, Long startingOffset, long stoppingOffset) {
if (startingOffset == null) {
throw new FlinkRuntimeException(
"Cannot initialize starting offset for partition " + tp);
}
if (startingOffset < 0 && !VALID_STARTING_OFFSET_MARKERS.contains(startingOffset)) {
throw new FlinkRuntimeException(
String.format(
"Invalid starting offset %d is specified for partition %s. "
+ "It should either be non-negative or be one of the "
+ "[%d(earliest), %d(latest), %d(committed)].",
startingOffset, tp, LATEST_OFFSET, EARLIEST_OFFSET, COMMITTED_OFFSET));
}
if (stoppingOffset < 0 && !VALID_STOPPING_OFFSET_MARKERS.contains(stoppingOffset)) {
throw new FlinkRuntimeException(
String.format(
"Illegal stopping offset %d is specified for partition %s. "
+ "It should either be non-negative or be one of the "
+ "[%d(latest), %d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
stoppingOffset,
tp,
LATEST_OFFSET,
COMMITTED_OFFSET,
NO_STOPPING_OFFSET));
}
}