private static Map autoCompleteSchemaRegistrySubject()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java [613:637]


    private static Map<String, String> autoCompleteSchemaRegistrySubject(
            Map<String, String> options) {
        Configuration configuration = Configuration.fromMap(options);
        // the subject autoComplete should only be used in sink with a single topic, check the topic
        // option first
        validateTopic(configuration);
        if (configuration.contains(TOPIC) && isSingleTopic(configuration)) {
            final Optional<String> valueFormat = configuration.getOptional(VALUE_FORMAT);
            final Optional<String> keyFormat = configuration.getOptional(KEY_FORMAT);
            final Optional<String> format = configuration.getOptional(FORMAT);
            final String topic = configuration.get(TOPIC).get(0);

            if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
                autoCompleteSubject(configuration, format.get(), topic + "-value");
            } else if (valueFormat.isPresent()
                    && SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
                autoCompleteSubject(configuration, "value." + valueFormat.get(), topic + "-value");
            }

            if (keyFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
                autoCompleteSubject(configuration, "key." + keyFormat.get(), topic + "-key");
            }
        }
        return configuration.toMap();
    }