in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java [613:637]
private static Map<String, String> autoCompleteSchemaRegistrySubject(
Map<String, String> options) {
Configuration configuration = Configuration.fromMap(options);
// the subject autoComplete should only be used in sink with a single topic, check the topic
// option first
validateTopic(configuration);
if (configuration.contains(TOPIC) && isSingleTopic(configuration)) {
final Optional<String> valueFormat = configuration.getOptional(VALUE_FORMAT);
final Optional<String> keyFormat = configuration.getOptional(KEY_FORMAT);
final Optional<String> format = configuration.getOptional(FORMAT);
final String topic = configuration.get(TOPIC).get(0);
if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
autoCompleteSubject(configuration, format.get(), topic + "-value");
} else if (valueFormat.isPresent()
&& SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
autoCompleteSubject(configuration, "value." + valueFormat.get(), topic + "-value");
}
if (keyFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
autoCompleteSubject(configuration, "key." + keyFormat.get(), topic + "-key");
}
}
return configuration.toMap();
}