in src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java [197:213]
private void readSinkRecordNonKeyData(SinkRecord record, boolean flattened) {
final Schema valueSchema = record.valueSchema();
if (valueSchema != null) {
if (flattened) {
// In a flattened event type, it's safe to read the field names directly
// from the schema as this isn't a complex Debezium message type.
applyNonKeyFields(valueSchema);
} else {
final Field after = valueSchema.field("after");
if (after == null) {
throw new ConnectException(
"Received an unexpected message type that does not have an 'after' Debezium block");
}
applyNonKeyFields(after.schema());
}
}
}