public String processStructRecord()

in src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java [92:128]


    public String processStructRecord(SinkRecord record) {
        String processedRecord;
        if (ConverterMode.DEBEZIUM_INGESTION == dorisOptions.getConverterMode()) {
            validate(record);
            RecordDescriptor recordDescriptor = buildRecordDescriptor(record);
            if (recordDescriptor.isTombstone()) {
                LOG.warn(
                        "The record value and schema is null, will not process. recordOffset={}",
                        record.kafkaOffset());
                return null;
            }
            String tableName = dorisOptions.getTopicMapTable(recordDescriptor.getTopicName());
            checkAndApplyTableChangesIfNeeded(tableName, recordDescriptor);

            List<String> nonKeyFieldNames = recordDescriptor.getNonKeyFieldNames();
            if (recordDescriptor.isDelete()) {
                processedRecord =
                        parseFieldValues(
                                recordDescriptor,
                                recordDescriptor.getBeforeStruct(),
                                nonKeyFieldNames,
                                true);
            } else {
                processedRecord =
                        parseFieldValues(
                                recordDescriptor,
                                recordDescriptor.getAfterStruct(),
                                nonKeyFieldNames,
                                false);
            }
        } else {
            byte[] bytes =
                    converter.fromConnectData(record.topic(), record.valueSchema(), record.value());
            processedRecord = new String(bytes, StandardCharsets.UTF_8);
        }
        return processedRecord;
    }