in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java [415:452]
public static Map<Integer, Long> parseSpecificOffsets(
String specificOffsetsStr, String optionKey) {
final Map<Integer, Long> offsetMap = new HashMap<>();
final String[] pairs = specificOffsetsStr.split(";");
final String validationExceptionMessage =
String.format(
"Invalid properties '%s' should follow the format "
+ "'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
optionKey, specificOffsetsStr);
if (pairs.length == 0) {
throw new ValidationException(validationExceptionMessage);
}
for (String pair : pairs) {
if (null == pair || pair.length() == 0 || !pair.contains(",")) {
throw new ValidationException(validationExceptionMessage);
}
final String[] kv = pair.split(",");
if (kv.length != 2
|| !kv[0].startsWith(PARTITION + ':')
|| !kv[1].startsWith(OFFSET + ':')) {
throw new ValidationException(validationExceptionMessage);
}
String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
try {
final Integer partition = Integer.valueOf(partitionValue);
final Long offset = Long.valueOf(offsetValue);
offsetMap.put(partition, offset);
} catch (NumberFormatException e) {
throw new ValidationException(validationExceptionMessage, e);
}
}
return offsetMap;
}