in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java [175:216]
static void validateScanBoundedMode(ReadableConfig tableOptions) {
tableOptions
.getOptional(SCAN_BOUNDED_MODE)
.ifPresent(
mode -> {
switch (mode) {
case TIMESTAMP:
if (!tableOptions
.getOptional(SCAN_BOUNDED_TIMESTAMP_MILLIS)
.isPresent()) {
throw new ValidationException(
String.format(
"'%s' is required in '%s' bounded mode"
+ " but missing.",
SCAN_BOUNDED_TIMESTAMP_MILLIS.key(),
ScanBoundedMode.TIMESTAMP));
}
break;
case SPECIFIC_OFFSETS:
if (!tableOptions
.getOptional(SCAN_BOUNDED_SPECIFIC_OFFSETS)
.isPresent()) {
throw new ValidationException(
String.format(
"'%s' is required in '%s' bounded mode"
+ " but missing.",
SCAN_BOUNDED_SPECIFIC_OFFSETS.key(),
ScanBoundedMode.SPECIFIC_OFFSETS));
}
if (!isSingleTopic(tableOptions)) {
throw new ValidationException(
"Currently Kafka source only supports specific offset for single topic.");
}
String specificOffsets =
tableOptions.get(SCAN_BOUNDED_SPECIFIC_OFFSETS);
parseSpecificOffsets(
specificOffsets, SCAN_BOUNDED_SPECIFIC_OFFSETS.key());
break;
}
});
}