in src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java [92:128]
public String processStructRecord(SinkRecord record) {
String processedRecord;
if (ConverterMode.DEBEZIUM_INGESTION == dorisOptions.getConverterMode()) {
validate(record);
RecordDescriptor recordDescriptor = buildRecordDescriptor(record);
if (recordDescriptor.isTombstone()) {
LOG.warn(
"The record value and schema is null, will not process. recordOffset={}",
record.kafkaOffset());
return null;
}
String tableName = dorisOptions.getTopicMapTable(recordDescriptor.getTopicName());
checkAndApplyTableChangesIfNeeded(tableName, recordDescriptor);
List<String> nonKeyFieldNames = recordDescriptor.getNonKeyFieldNames();
if (recordDescriptor.isDelete()) {
processedRecord =
parseFieldValues(
recordDescriptor,
recordDescriptor.getBeforeStruct(),
nonKeyFieldNames,
true);
} else {
processedRecord =
parseFieldValues(
recordDescriptor,
recordDescriptor.getAfterStruct(),
nonKeyFieldNames,
false);
}
} else {
byte[] bytes =
converter.fromConnectData(record.topic(), record.valueSchema(), record.value());
processedRecord = new String(bytes, StandardCharsets.UTF_8);
}
return processedRecord;
}