public R apply()

in core/src/main/java/org/apache/camel/kafkaconnector/transforms/SinkPojoToSchemaAndStructTransform.java [60:98]


    public R apply(R r) {
        LOG.debug("Incoming record: {}", r);

        if (r.value() != null && r.valueSchema() != null && Schema.Type.STRUCT.equals(r.valueSchema().type())) {
            GenericRecord avroGenericRecord = (GenericRecord)avroData.fromConnectData(r.valueSchema(), r.value());

            if (avroGenericRecord == null) {
                LOG.warn("No GenericRecord was converted as part of this transformation");

                return r;
            }

            LOG.debug("GenericRecord created: {} \nwith schema: {}", avroGenericRecord, avroGenericRecord.getClass().getName());

            GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(avroGenericRecord.getSchema());

            Object pojo;
            try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
                Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
                writer.write(avroGenericRecord, encoder);
                encoder.flush();

                byte[] avroDataBytes = out.toByteArray();
                pojo = objectReader
                        .with(new AvroSchema(avroGenericRecord.getSchema()))
                        .readValue(avroDataBytes);
                LOG.debug("Pojo of class {} created: {}", pojo.getClass(), pojo);
            } catch (IOException e) {
                throw new ConnectException("Error in generating POJO from Struct.", e);
            }

            LOG.debug("Generate pojo: {}", pojo);
            return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
                    null, pojo, r.timestamp());
        } else {
            LOG.debug("Incoming record with a null value or a value schema != Schema.Type.STRUCT, nothing to be done.");
            return r;
        }
    }