in src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java [257:284]
private String parseRecordTableName(
String defaultTableName, String tableNameField, SinkRecord record) {
Object recordValue = record.value();
Map<String, Object> recordMap = Collections.emptyMap();
if (recordValue instanceof Struct) {
LOG.warn(
"The Struct type record not supported for The 'record.tablename.field' configuration, field={}",
tableNameField);
return defaultTableName;
} else if (recordValue instanceof Map) {
recordMap = (Map<String, Object>) recordValue;
} else if (recordValue instanceof String) {
try {
recordMap = objectMapper.readValue((String) recordValue, Map.class);
} catch (JsonProcessingException e) {
LOG.warn(
"The String type record failed to parse record value to map. record={}, field={}",
recordValue,
tableNameField,
e);
}
}
// if the field is not found in the record, use the table name in the config
if (!recordMap.containsKey(tableNameField)) {
return defaultTableName;
}
return recordMap.get(tableNameField).toString();
}