public R apply()

in core/src/main/java/org/apache/camel/kafkaconnector/transforms/SourcePojoToSchemaAndStructTransform.java [53:97]


    public R apply(R r) {
        LOG.debug("Incoming record: {}", r);

        if (r.value() != null) {
            String recordClassCanonicalName = r.value().getClass().getName();
            CacheEntry cacheEntry = avroSchemaWrapperCache.computeIfAbsent(recordClassCanonicalName, new Function<String, CacheEntry>() {
                @Override
                public CacheEntry apply(String s) {
                    //cache miss
                    AvroSchemaGenerator gen = new AvroSchemaGenerator();

                    try {
                        MAPPER.acceptJsonFormatVisitor(r.value().getClass(), gen);
                        AvroSchema schemaWrapper = gen.getGeneratedSchema();
                        LOG.debug("Generated and cached avro schema: {}", schemaWrapper.getAvroSchema().toString(true));

                        return new CacheEntry(schemaWrapper, MAPPER.writer(schemaWrapper));
                    } catch (Exception e) {
                        throw new ConnectException("Error in generating POJO schema.", e);
                    }
                }
            });

            SchemaAndValue connectSchemaAndData = null;
            try {
                byte[] avroDataByte = cacheEntry.getObjectWriter().writeValueAsBytes(r.value());
                Decoder decoder = DecoderFactory.get().binaryDecoder(avroDataByte, null);
                org.apache.avro.Schema avroSchema = cacheEntry.getAvroSchemaWrapper().getAvroSchema();
                DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(avroSchema);
                GenericRecord genericAvroData = datumReader.read(null, decoder);

                connectSchemaAndData = this.avroData.toConnectData(avroSchema, genericAvroData);
            } catch (IOException e) {
                throw new ConnectException("Error in generating POJO Struct.", e);
            }

            LOG.debug("Generate kafka connect schema: {}", connectSchemaAndData.schema());
            LOG.debug("Generate kafka connect value (as Struct): {}", connectSchemaAndData.value());
            return r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(),
                    connectSchemaAndData.schema(), connectSchemaAndData.value(), r.timestamp());
        } else {
            LOG.debug("Incoming record with a null value, nothing to be done.");
            return r;
        }
    }