in statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaTopicPartition.java [28:58]
public static KafkaTopicPartition fromString(String topicAndPartition) {
Objects.requireNonNull(topicAndPartition);
final int pos = topicAndPartition.lastIndexOf("/");
if (pos <= 0 || pos == topicAndPartition.length() - 1) {
throw new IllegalArgumentException(
topicAndPartition + " does not conform to the <topic>/<partition_id> format");
}
String topic = topicAndPartition.substring(0, pos);
Integer partitionId;
try {
partitionId = Integer.valueOf(topicAndPartition.substring(pos + 1));
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Invalid topic partition definition: "
+ topicAndPartition
+ "; partition id is expected to be an integer with value between 0 and "
+ Integer.MAX_VALUE,
e);
}
if (partitionId < 0) {
throw new IllegalArgumentException(
"Invalid topic partition definition: "
+ topicAndPartition
+ "; partition id is expected to be an integer with value between 0 and "
+ Integer.MAX_VALUE);
}
return new KafkaTopicPartition(topic, partitionId);
}