in src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java [141:164]
public SchemaAndValue toConnectData(String topic, byte[] value) {
if (value == null) {
LOG.warn("cast bytes is null");
return new SchemaAndValue(new DorisJsonSchema(), null);
}
if (topic2SchemaMap.containsKey(topic)) {
Schema schema = topic2SchemaMap.get(topic);
ByteBuffer buffer = ByteBuffer.wrap(value);
int length = buffer.limit();
byte[] data = new byte[length];
buffer.get(data, 0, length);
try {
return new SchemaAndValue(
new DorisJsonSchema(), parseAvroWithSchema(data, schema, schema));
} catch (IOException e) {
LOG.error("failed to parse AVRO record\n" + e);
throw new DataDecodeException("failed to parse AVRO record\n", e);
}
} else {
LOG.error("The avro schema file of {} is not provided.", topic);
throw new DataDecodeException("The avro schema file of " + topic + " is not provided.");
}
}