in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java [131:173]
private static void validateScanStartupMode(ReadableConfig tableOptions) {
tableOptions
.getOptional(SCAN_STARTUP_MODE)
.ifPresent(
mode -> {
switch (mode) {
case TIMESTAMP:
if (!tableOptions
.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS)
.isPresent()) {
throw new ValidationException(
String.format(
"'%s' is required in '%s' startup mode"
+ " but missing.",
SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
ScanStartupMode.TIMESTAMP));
}
break;
case SPECIFIC_OFFSETS:
if (!tableOptions
.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS)
.isPresent()) {
throw new ValidationException(
String.format(
"'%s' is required in '%s' startup mode"
+ " but missing.",
SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
ScanStartupMode.SPECIFIC_OFFSETS));
}
if (!isSingleTopic(tableOptions)) {
throw new ValidationException(
"Currently Kafka source only supports specific offset for single topic.");
}
String specificOffsets =
tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
parseSpecificOffsets(
specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
break;
}
});
}