private String getTargetTopic()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java [230:251]


    private String getTargetTopic(RowData element) {
        if (topics != null && topics.size() == 1) {
            // If topics is a singleton list, we only return the provided topic.
            return topics.stream().findFirst().get();
        }
        final String targetTopic = readMetadata(element, KafkaDynamicSink.WritableMetadata.TOPIC);
        if (targetTopic == null) {
            throw new IllegalArgumentException(
                    "The topic of the sink record is not valid. Expected a single topic but no topic is set.");
        } else if (topics != null && !topics.contains(targetTopic)) {
            throw new IllegalArgumentException(
                    String.format(
                            "The topic of the sink record is not valid. Expected topic to be in: %s but was: %s",
                            topics, targetTopic));
        } else if (topicPattern != null && !cachedTopicPatternMatch(targetTopic)) {
            throw new IllegalArgumentException(
                    String.format(
                            "The topic of the sink record is not valid. Expected topic to match: %s but was: %s",
                            topicPattern, targetTopic));
        }
        return targetTopic;
    }