in debezium-eventhubs-blob/src/main/java/org/apache/camel/example/debezium/eventhubs/blob/consumer/DebeziumMySqlConsumerToAzureEventHubsRouteBuilder.java [32:80]
public void configure() throws Exception {
// Initial Debezium route that will run and listens to the changes,
// first it will perform an initial snapshot using (select * from) in case there are no offsets
// exists for the connector, and then it will listen to MySQL binlogs for any DB events such as (UPDATE, INSERT and DELETE)
from("debezium-mysql:{{debezium.mysql.name}}?"
+ "databaseServerId={{debezium.mysql.databaseServerId}}"
+ "&databaseHostname={{debezium.mysql.databaseHostName}}"
+ "&databasePort={{debezium.mysql.databasePort}}"
+ "&databaseUser={{debezium.mysql.databaseUser}}"
+ "&databasePassword={{debezium.mysql.databasePassword}}"
+ "&databaseServerName={{debezium.mysql.databaseServerName}}"
+ "&databaseHistoryFileFilename={{debezium.mysql.databaseHistoryFileName}}"
+ "&databaseIncludeList={{debezium.mysql.databaseIncludeList}}"
+ "&tableIncludeList={{debezium.mysql.tableIncludeList}}"
+ "&offsetStorageFileName={{debezium.mysql.offsetStorageFileName}}")
.routeId("FromDebeziumMySql")
// We will need to prepare the data for Azure EventHubs Therefore, we will hash the key to make sure our record land on the same partition
// and convert it to string, but that means we need to preserve the key information into the message body in order not to lose this information downstream.
// Note: If you'd use Kafka, most probably you will not need these transformations as you can send the key as an object and Kafka will do
// the rest to hash it in the broker in order to place it in the correct topic's partition.
.setBody(exchange -> {
// Using Camel Data Format, we can retrieve our data in Map since Debezium component has a Type Converter from Struct to Map, you need to specify the Map.class
// in order to convert the data from Struct to Map
final Map key = exchange.getMessage().getHeader(DebeziumConstants.HEADER_KEY, Map.class);
final Map value = exchange.getMessage().getBody(Map.class);
// Also, we need the operation in order to determine when an INSERT, UPDATE or DELETE happens
final String operation = (String) exchange.getMessage().getHeader(DebeziumConstants.HEADER_OPERATION);
// We we will put everything as nested Map in order to utilize Camel's Type Format
final Map<String, Object> eventHubBody = new HashMap<>();
eventHubBody.put("key", key);
eventHubBody.put("value", value);
eventHubBody.put("operation", operation);
return eventHubBody;
})
// As we mentioned above, we will need to hash the key partition and set it into the headers
.process(exchange -> {
final Struct key = (Struct) exchange.getMessage().getHeader(DebeziumConstants.HEADER_KEY);
final String hash = String.valueOf(key.hashCode());
exchange.getMessage().setHeader(EventHubsConstants.PARTITION_KEY, hash);
})
// Marshal everything to JSON, you can use any other data format such as Avro, Protobuf..etc, but in this example we will keep it to JSON for simplicity
.marshal().json(JsonLibrary.Jackson)
// Send our data to Azure Event Hubs
.to("azure-eventhubs:?connectionString=RAW({{eventhubs.connectionString}})")
.end();
}