static RouteBuilder createRouteBuilder()

in debezium/src/main/java/org/apache/camel/example/debezium/DebeziumPgSQLConsumerToKinesis.java [55:111]


    static RouteBuilder createRouteBuilder() {
        return new RouteBuilder() {
            public void configure() {
                // Initial Debezium route that will run and listen to the changes,
                // first it will perform an initial snapshot using (select * from) in case no offset
                // exists for the connector, and then it will listen to postgres for any DB events such as (UPDATE, INSERT and DELETE)
                from("debezium-postgres:{{debezium.postgres.name}}?"
                        + "databaseHostname={{debezium.postgres.databaseHostName}}"
                        + "&databasePort={{debezium.postgres.databasePort}}"
                        + "&databaseUser={{debezium.postgres.databaseUser}}"
                        + "&databasePassword={{debezium.postgres.databasePassword}}"
                        + "&topicPrefix={{debezium.postgres.topicPrefix}}"
                        + "&databaseDbname={{debezium.postgres.databaseDbname}}"
                        + "&schemaIncludeList={{debezium.postgres.schemaIncludeList}}"
                        + "&tableIncludeList={{debezium.postgres.tableIncludeList}}"
                        + "&replicaIdentityAutosetValues={{debezium.postgres.replica.identity.autoset.values}}"
                        + "&additionalProperties.notification.enabled.channels=log"
                        + "&offsetStorageFileName={{debezium.postgres.offsetStorageFileName}}")
                        .routeId("FromDebeziumPgSql")
                        // We will need to prepare the data for Kinesis, however we need to mention here is that Kinesis is a bit different from Kafka in terms
                        // of the key partition which only limited to 256 byte length, depending on the size of your key, that may not be optimal. Therefore, the safer option is to hash the key
                        // and convert it to string, but that means we need to preserve the key information into the message body in order not to lose this information downstream.
                        // Note: If you'd use Kafka, most probably you will not need these transformations as you can send the key as an object and Kafka will do
                        // the rest to hash it in the broker in order to place it in the correct topic's partition.
                        .setBody(exchange -> {
                            // Using Camel Data Format, we can retrieve our data in Map since Debezium component has a Type Converter from Struct to Map, you need to specify the Map.class
                            // in order to convert the data from Struct to Map
                            final Map key = exchange.getMessage().getHeader(DebeziumConstants.HEADER_KEY, Map.class);
                            final Map value = exchange.getMessage().getBody(Map.class);
                            // Also, we need the operation in order to determine when an INSERT, UPDATE or DELETE happens
                            final String operation = (String) exchange.getMessage().getHeader(DebeziumConstants.HEADER_OPERATION);
                            // We will put everything as nested Map in order to utilize Camel's Type Format
                            final Map<String, Object> kinesisBody = new HashMap<>();

                            kinesisBody.put("key", key);
                            kinesisBody.put("value", value);
                            kinesisBody.put("operation", operation);

                            return kinesisBody;
                        })
                        // As we mentioned above, we will need to hash the key partition and set it into the headers
                        .process(exchange -> {
                            final Struct key = (Struct) exchange.getMessage().getHeader(DebeziumConstants.HEADER_KEY);
                            final String hash = String.valueOf(key.hashCode());

                            exchange.getMessage().setHeader(Kinesis2Constants.PARTITION_KEY, hash);
                        })
                        // Marshal everything to JSON, you can use any other data format such as Avro, Protobuf..etc, but in this example we will keep it to JSON for simplicity
                        .marshal().json(JsonLibrary.Jackson)
                        // Send our data to kinesis
                        .to("aws2-kinesis:{{kinesis.streamName}}?accessKey=RAW({{kinesis.accessKey}})"
                                + "&secretKey=RAW({{kinesis.secretKey}})"
                                + "&region={{kinesis.region}}")
                        .end();
            }
        };
    }