in core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java [60:98]
public R apply(R r) {
LOG.debug("Incoming record: {}", r);
if (r.value() != null && r.valueSchema() != null && Schema.Type.STRUCT.equals(r.valueSchema().type())) {
GenericRecord avroGenericRecord = (GenericRecord)avroData.fromConnectData(r.valueSchema(), r.value());
if (avroGenericRecord == null) {
LOG.warn("No GenericRecord was converted as part of this transformation");
return r;
}
LOG.debug("GenericRecord created: {} \nwith schema: {}", avroGenericRecord, avroGenericRecord.getClass().getName());
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(avroGenericRecord.getSchema());
Object pojo;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(avroGenericRecord, encoder);
encoder.flush();
byte[] avroDataBytes = out.toByteArray();
pojo = objectReader
.with(new AvroSchema(avroGenericRecord.getSchema()))
.readValue(avroDataBytes);
LOG.debug("Pojo of class {} created: {}", pojo.getClass(), pojo);
} catch (IOException e) {
throw new ConnectException("Error in generating POJO from Struct.", e);
}
LOG.debug("Generate pojo: {}", pojo);
return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
null, pojo, r.timestamp());
} else {
LOG.debug("Incoming record with a null value or a value schema != Schema.Type.STRUCT, nothing to be done.");
return r;
}
}