in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java [230:251]
private String getTargetTopic(RowData element) {
if (topics != null && topics.size() == 1) {
// If topics is a singleton list, we only return the provided topic.
return topics.stream().findFirst().get();
}
final String targetTopic = readMetadata(element, KafkaDynamicSink.WritableMetadata.TOPIC);
if (targetTopic == null) {
throw new IllegalArgumentException(
"The topic of the sink record is not valid. Expected a single topic but no topic is set.");
} else if (topics != null && !topics.contains(targetTopic)) {
throw new IllegalArgumentException(
String.format(
"The topic of the sink record is not valid. Expected topic to be in: %s but was: %s",
topics, targetTopic));
} else if (topicPattern != null && !cachedTopicPatternMatch(targetTopic)) {
throw new IllegalArgumentException(
String.format(
"The topic of the sink record is not valid. Expected topic to match: %s but was: %s",
topicPattern, targetTopic));
}
return targetTopic;
}