in core/src/main/java/org/apache/camel/kafkaconnector/transforms/SchemaAndStructToJsonTransform.java [39:58]
public R apply(R r) {
LOG.debug("Incoming record: {}", r);
if (r.value() != null && r.valueSchema() != null) {
byte[] json = jsonConverter.fromConnectData(r.topic(), r.valueSchema(), r.value());
if (json == null) {
LOG.warn("No record was converted as part of this transformation, resulting json byte[] was null.");
return r;
}
LOG.debug("Json created: {}", new String(json));
return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
Schema.BYTES_SCHEMA, json, r.timestamp());
} else {
LOG.debug("Incoming record with a null value or a null schema, nothing to be done.");
return r;
}
}