public SchemaAndValue toConnectData()

in src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java [141:164]


    public SchemaAndValue toConnectData(String topic, byte[] value) {
        if (value == null) {
            LOG.warn("cast bytes is null");
            return new SchemaAndValue(new DorisJsonSchema(), null);
        }

        if (topic2SchemaMap.containsKey(topic)) {
            Schema schema = topic2SchemaMap.get(topic);
            ByteBuffer buffer = ByteBuffer.wrap(value);
            int length = buffer.limit();
            byte[] data = new byte[length];
            buffer.get(data, 0, length);
            try {
                return new SchemaAndValue(
                        new DorisJsonSchema(), parseAvroWithSchema(data, schema, schema));
            } catch (IOException e) {
                LOG.error("failed to parse AVRO record\n" + e);
                throw new DataDecodeException("failed to parse AVRO record\n", e);
            }
        } else {
            LOG.error("The avro schema file of {} is not provided.", topic);
            throw new DataDecodeException("The avro schema file of " + topic + " is not provided.");
        }
    }