private static void verifyInitialOffset()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java [123:151]


    private static void verifyInitialOffset(
            TopicPartition tp, Long startingOffset, long stoppingOffset) {
        if (startingOffset == null) {
            throw new FlinkRuntimeException(
                    "Cannot initialize starting offset for partition " + tp);
        }

        if (startingOffset < 0 && !VALID_STARTING_OFFSET_MARKERS.contains(startingOffset)) {
            throw new FlinkRuntimeException(
                    String.format(
                            "Invalid starting offset %d is specified for partition %s. "
                                    + "It should either be non-negative or be one of the "
                                    + "[%d(earliest), %d(latest), %d(committed)].",
                            startingOffset, tp, LATEST_OFFSET, EARLIEST_OFFSET, COMMITTED_OFFSET));
        }

        if (stoppingOffset < 0 && !VALID_STOPPING_OFFSET_MARKERS.contains(stoppingOffset)) {
            throw new FlinkRuntimeException(
                    String.format(
                            "Illegal stopping offset %d is specified for partition %s. "
                                    + "It should either be non-negative or be one of the "
                                    + "[%d(latest), %d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
                            stoppingOffset,
                            tp,
                            LATEST_OFFSET,
                            COMMITTED_OFFSET,
                            NO_STOPPING_OFFSET));
        }
    }