in debezium/src/main/java/org/apache/camel/example/debezium/KinesisProducerToCassandra.java [53:108]
static RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
// We set the CQL templates we need, note that an UPDATE in Cassandra means an UPSERT which is what we need
final String cqlUpdate = "update products set name = ?, description = ?, weight = ? where id = ?";
final String cqlDelete = "delete from products where id = ?";
from("aws2-kinesis:{{kinesis.streamName}}?accessKey=RAW({{kinesis.accessKey}})"
+ "&secretKey=RAW({{kinesis.secretKey}})"
+ "®ion={{kinesis.region}}")
// Since we expect the data of the body to be ByteArr, we convert it to String using Kinesis
// Type Converter, in order to unmarshal later from JSON to Map
.convertBodyTo(String.class)
// Unmarshal our body, it will convert it from JSON to Map
.unmarshal().json(JsonLibrary.Jackson)
// In order not to lose the operation that we set in Debezium, we set it as a property or as
// a header
.setProperty("DBOperation", simple("${body[operation]}"))
.choice()
// If we have a INSERT or UPDATE, we will need to set the body with the CQL query parameters since we are using
// camel-cassandraql component
.when(exchangeProperty("DBOperation").in("c", "u"))
.setBody(exchange -> {
final Map body = (Map) exchange.getMessage().getBody();
final Map value = (Map) body.get("value");
final Map key = (Map) body.get("key");
// We as well check for nulls
final String name = value.get("name") != null ? value.get("name").toString() : "";
final String description = value.get("description") != null ? value.get("description").toString() : "";
final float weight = value.get("weight") != null ? Float.parseFloat(value.get("weight").toString()) : 0;
return Arrays.asList(name, description, weight, key.get("id"));
})
// We set the appropriate query in the header so we don't run the same route twice
.setHeader("CQLQuery", constant(cqlUpdate))
// If we have a DELETE, then we just set the id as a query parameter in the body
.when(exchangeProperty("DBOperation").isEqualTo("d"))
.setBody(exchange -> {
final Map body = (Map) exchange.getMessage().getBody();
final Map key = (Map) body.get("key");
return Collections.singletonList(key.get("id"));
})
// We set the appropriate query in the header so we don't run the same route twice
.setHeader("CQLQuery", constant(cqlDelete))
.end()
.choice()
// We just make sure we ONLY handle INSERT, UPDATE and DELETE and nothing else
.when(exchangeProperty("DBOperation").in("c", "u", "d"))
// Send query to Cassandra
.recipientList(simple("cql:{{cassandra.node}}/{{cassandra.keyspace}}?cql=RAW(${header.CQLQuery})"))
.end();
}
};
}