private static Map autoCompleteSchemaRegistrySubject()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java [636:656]


    private static Map<String, String> autoCompleteSchemaRegistrySubject(
            Map<String, String> options) {
        Configuration configuration = Configuration.fromMap(options);
        // the subject autoComplete should only be used in sink, check the topic first
        validateSinkTopic(configuration);
        final Optional<String> valueFormat = configuration.getOptional(VALUE_FORMAT);
        final Optional<String> keyFormat = configuration.getOptional(KEY_FORMAT);
        final Optional<String> format = configuration.getOptional(FORMAT);
        final String topic = configuration.get(TOPIC).get(0);

        if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
            autoCompleteSubject(configuration, format.get(), topic + "-value");
        } else if (valueFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
            autoCompleteSubject(configuration, "value." + valueFormat.get(), topic + "-value");
        }

        if (keyFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
            autoCompleteSubject(configuration, "key." + keyFormat.get(), topic + "-key");
        }
        return configuration.toMap();
    }