public void configure()

in debezium-eventhubs-blob/src/main/java/org/apache/camel/example/debezium/eventhubs/blob/consumer/DebeziumMySqlConsumerToAzureEventHubsRouteBuilder.java [32:80]


    public void configure() throws Exception {
        // Initial Debezium route that will run and listens to the changes,
        // first it will perform an initial snapshot using (select * from) in case there are no offsets
        // exists for the connector, and then it will listen to MySQL binlogs for any DB events such as (UPDATE, INSERT and DELETE)
        from("debezium-mysql:{{debezium.mysql.name}}?"
                + "databaseServerId={{debezium.mysql.databaseServerId}}"
                + "&databaseHostname={{debezium.mysql.databaseHostName}}"
                + "&databasePort={{debezium.mysql.databasePort}}"
                + "&databaseUser={{debezium.mysql.databaseUser}}"
                + "&databasePassword={{debezium.mysql.databasePassword}}"
                + "&databaseServerName={{debezium.mysql.databaseServerName}}"
                + "&databaseHistoryFileFilename={{debezium.mysql.databaseHistoryFileName}}"
                + "&databaseIncludeList={{debezium.mysql.databaseIncludeList}}"
                + "&tableIncludeList={{debezium.mysql.tableIncludeList}}"
                + "&offsetStorageFileName={{debezium.mysql.offsetStorageFileName}}")
                .routeId("FromDebeziumMySql")
                // We will need to prepare the data for Azure EventHubs Therefore, we will hash the key to make sure our record land on the same partition
                // and convert it to string, but that means we need to preserve the key information into the message body in order not to lose this information downstream.
                // Note: If you'd use Kafka, most probably you will not need these transformations as you can send the key as an object and Kafka will do
                // the rest to hash it in the broker in order to place it in the correct topic's partition.
                .setBody(exchange -> {
                    // Using Camel Data Format, we can retrieve our data in Map since Debezium component has a Type Converter from Struct to Map, you need to specify the Map.class
                    // in order to convert the data from Struct to Map
                    final Map key = exchange.getMessage().getHeader(DebeziumConstants.HEADER_KEY, Map.class);
                    final Map value = exchange.getMessage().getBody(Map.class);
                    // Also, we need the operation in order to determine when an INSERT, UPDATE or DELETE happens
                    final String operation = (String) exchange.getMessage().getHeader(DebeziumConstants.HEADER_OPERATION);
                    // We we will put everything as nested Map in order to utilize Camel's Type Format
                    final Map<String, Object> eventHubBody = new HashMap<>();

                    eventHubBody.put("key", key);
                    eventHubBody.put("value", value);
                    eventHubBody.put("operation", operation);

                    return eventHubBody;
                })
                // As we mentioned above, we will need to hash the key partition and set it into the headers
                .process(exchange -> {
                    final Struct key = (Struct) exchange.getMessage().getHeader(DebeziumConstants.HEADER_KEY);
                    final String hash = String.valueOf(key.hashCode());

                    exchange.getMessage().setHeader(EventHubsConstants.PARTITION_KEY, hash);
                })
                // Marshal everything to JSON, you can use any other data format such as Avro, Protobuf..etc, but in this example we will keep it to JSON for simplicity
                .marshal().json(JsonLibrary.Jackson)
                // Send our data to Azure Event Hubs
                .to("azure-eventhubs:?connectionString=RAW({{eventhubs.connectionString}})")
                .end();
    }