static RouteBuilder createRouteBuilder()

in debezium/src/main/java/org/apache/camel/example/debezium/KinesisProducerToCassandra.java [53:108]


    static RouteBuilder createRouteBuilder() {
        return new RouteBuilder() {
            public void configure() {
                // We set the CQL templates we need, note that an UPDATE in Cassandra means an UPSERT which is what we need
                final String cqlUpdate = "update products set name = ?, description = ?, weight = ? where id = ?";
                final String cqlDelete = "delete from products where id = ?";

                from("aws2-kinesis:{{kinesis.streamName}}?accessKey=RAW({{kinesis.accessKey}})"
                        + "&secretKey=RAW({{kinesis.secretKey}})"
                        + "&region={{kinesis.region}}")
                        // Since we expect the data of the body to be ByteArr, we convert it to String using Kinesis
                        // Type Converter, in order to unmarshal later from JSON to Map
                        .convertBodyTo(String.class)
                        // Unmarshal our body, it will convert it from JSON to Map
                        .unmarshal().json(JsonLibrary.Jackson)
                        // In order not to lose the operation that we set in Debezium, we set it as a property or as
                        // a header
                        .setProperty("DBOperation", simple("${body[operation]}"))
                        .choice()
                        // If we have a INSERT or UPDATE, we will need to set the body with the CQL query parameters since we are using
                        // camel-cassandraql component
                        .when(exchangeProperty("DBOperation").in("c", "u"))
                        .setBody(exchange -> {
                            final Map body = (Map) exchange.getMessage().getBody();
                            final Map value = (Map) body.get("value");
                            final Map key = (Map) body.get("key");

                            // We as well check for nulls
                            final String name = value.get("name") != null ? value.get("name").toString() : "";
                            final String description = value.get("description") != null ? value.get("description").toString() : "";
                            final float weight = value.get("weight") != null ? Float.parseFloat(value.get("weight").toString()) : 0;

                            return Arrays.asList(name, description, weight, key.get("id"));
                        })
                        // We set the appropriate query in the header so we don't run the same route twice
                        .setHeader("CQLQuery", constant(cqlUpdate))
                        // If we have a DELETE, then we just set the id as a query parameter in the body
                        .when(exchangeProperty("DBOperation").isEqualTo("d"))
                        .setBody(exchange -> {
                            final Map body = (Map) exchange.getMessage().getBody();
                            final Map key = (Map) body.get("key");

                            return Collections.singletonList(key.get("id"));
                        })
                        // We set the appropriate query in the header so we don't run the same route twice
                        .setHeader("CQLQuery", constant(cqlDelete))
                        .end()
                        .choice()
                        // We just make sure we ONLY handle INSERT, UPDATE and DELETE and nothing else
                        .when(exchangeProperty("DBOperation").in("c", "u", "d"))
                        // Send query to Cassandra
                        .recipientList(simple("cql:{{cassandra.node}}/{{cassandra.keyspace}}?cql=RAW(${header.CQLQuery})"))
                        .end();
            }
        };
    }