in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java [509:541]
private void sanityCheck() {
// Check required configs.
for (String requiredConfig : REQUIRED_CONFIGS) {
checkNotNull(
props.getProperty(requiredConfig),
String.format("Property %s is required but not provided", requiredConfig));
}
// Check required settings.
checkNotNull(
subscriber,
"No subscribe mode is specified, "
+ "should be one of topics, topic pattern and partition set.");
checkNotNull(deserializationSchema, "Deserialization schema is required but not provided.");
// Check consumer group ID
checkState(
props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || !offsetCommitEnabledManually(),
String.format(
"Property %s is required when offset commit is enabled",
ConsumerConfig.GROUP_ID_CONFIG));
// Check offsets initializers
if (startingOffsetsInitializer instanceof OffsetsInitializerValidator) {
((OffsetsInitializerValidator) startingOffsetsInitializer).validate(props);
}
if (stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) {
((OffsetsInitializerValidator) stoppingOffsetsInitializer).validate(props);
}
if (props.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
checkDeserializer(props.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
}
if (props.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
checkDeserializer(props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
}
}