in src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java [270:303]
public static Map<String, String> parseTopicToTableMap(String input) {
Map<String, String> topic2Table = new HashMap<>();
boolean isInvalid = false;
for (String str : input.split(",")) {
String[] tt = str.split(":");
if (tt.length != 2 || tt[0].trim().isEmpty() || tt[1].trim().isEmpty()) {
LOG.error(
"Invalid {} config format: {}",
DorisSinkConnectorConfig.TOPICS_TABLES_MAP,
input);
return null;
}
String topic = tt[0].trim();
String table = tt[1].trim();
if (table.isEmpty()) {
LOG.error("tableName is empty");
isInvalid = true;
}
if (topic2Table.containsKey(topic)) {
LOG.error("topic name {} is duplicated", topic);
isInvalid = true;
}
topic2Table.put(tt[0].trim(), tt[1].trim());
}
if (isInvalid) {
throw new DorisException("Failed to parse topic2table map");
}
return topic2Table;
}