private void sanityCheck()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java [509:541]


    private void sanityCheck() {
        // Check required configs.
        for (String requiredConfig : REQUIRED_CONFIGS) {
            checkNotNull(
                    props.getProperty(requiredConfig),
                    String.format("Property %s is required but not provided", requiredConfig));
        }
        // Check required settings.
        checkNotNull(
                subscriber,
                "No subscribe mode is specified, "
                        + "should be one of topics, topic pattern and partition set.");
        checkNotNull(deserializationSchema, "Deserialization schema is required but not provided.");
        // Check consumer group ID
        checkState(
                props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || !offsetCommitEnabledManually(),
                String.format(
                        "Property %s is required when offset commit is enabled",
                        ConsumerConfig.GROUP_ID_CONFIG));
        // Check offsets initializers
        if (startingOffsetsInitializer instanceof OffsetsInitializerValidator) {
            ((OffsetsInitializerValidator) startingOffsetsInitializer).validate(props);
        }
        if (stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) {
            ((OffsetsInitializerValidator) stoppingOffsetsInitializer).validate(props);
        }
        if (props.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
            checkDeserializer(props.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
        }
        if (props.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
            checkDeserializer(props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
        }
    }