in src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java [269:298]
private String parseFieldValues(
RecordDescriptor record, Struct source, List<String> fields, boolean isDelete) {
Map<String, Object> filedMapping = new LinkedHashMap<>();
String filedResult = null;
for (String fieldName : fields) {
final RecordDescriptor.FieldDescriptor field = record.getFields().get(fieldName);
Type type = field.getType();
Object value =
field.getSchema().isOptional()
? source.getWithoutDefault(fieldName)
: source.get(fieldName);
Object convertValue = type.getValue(value, field.getSchema());
if (Objects.nonNull(convertValue) && !type.isNumber()) {
filedMapping.put(fieldName, convertValue.toString());
} else {
filedMapping.put(fieldName, convertValue);
}
}
try {
if (isDelete) {
filedMapping.put(LoadConstants.DORIS_DELETE_SIGN, LoadConstants.DORIS_DEL_TRUE);
} else {
filedMapping.put(LoadConstants.DORIS_DELETE_SIGN, LoadConstants.DORIS_DEL_FALSE);
}
filedResult = MAPPER.writeValueAsString(filedMapping);
} catch (JsonProcessingException e) {
LOG.error("parse record failed, cause by parse json error: {}", filedMapping);
}
return filedResult;
}