in core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java [53:97]
public R apply(R r) {
LOG.debug("Incoming record: {}", r);
if (r.value() != null) {
String recordClassCanonicalName = r.value().getClass().getName();
CacheEntry cacheEntry = avroSchemaWrapperCache.computeIfAbsent(recordClassCanonicalName, new Function<String, CacheEntry>() {
@Override
public CacheEntry apply(String s) {
//cache miss
AvroSchemaGenerator gen = new AvroSchemaGenerator();
try {
MAPPER.acceptJsonFormatVisitor(r.value().getClass(), gen);
AvroSchema schemaWrapper = gen.getGeneratedSchema();
LOG.debug("Generated and cached avro schema: {}", schemaWrapper.getAvroSchema().toString(true));
return new CacheEntry(schemaWrapper, MAPPER.writer(schemaWrapper));
} catch (Exception e) {
throw new ConnectException("Error in generating POJO schema.", e);
}
}
});
SchemaAndValue connectSchemaAndData = null;
try {
byte[] avroDataByte = cacheEntry.getObjectWriter().writeValueAsBytes(r.value());
Decoder decoder = DecoderFactory.get().binaryDecoder(avroDataByte, null);
org.apache.avro.Schema avroSchema = cacheEntry.getAvroSchemaWrapper().getAvroSchema();
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
GenericRecord genericAvroData = datumReader.read(null, decoder);
connectSchemaAndData = this.avroData.toConnectData(avroSchema, genericAvroData);
} catch (IOException e) {
throw new ConnectException("Error in generating POJO Struct.", e);
}
LOG.debug("Generate kafka connect schema: {}", connectSchemaAndData.schema());
LOG.debug("Generate kafka connect value (as Struct): {}", connectSchemaAndData.value());
return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
connectSchemaAndData.schema(), connectSchemaAndData.value(), r.timestamp());
} else {
LOG.debug("Incoming record with a null value, nothing to be done.");
return r;
}
}