in kafka/src/main/java/org/apache/camel/example/kafka/MessagePublisherClient.java [41:89]
public static void main(String[] args) throws Exception {
LOG.info("About to run Kafka-camel integration...");
String testKafkaMessage = "Test Message from MessagePublisherClient " + Calendar.getInstance().getTime();
try (CamelContext camelContext = new DefaultCamelContext()) {
// Set the location of the configuration
camelContext.getPropertiesComponent().setLocation("classpath:application.properties");
// Set up the Kafka component
setUpKafkaComponent(camelContext);
// Add route to send messages to Kafka
camelContext.addRoutes(createRouteBuilder());
try (ProducerTemplate producerTemplate = camelContext.createProducerTemplate()) {
camelContext.start();
Map<String, Object> headers = new HashMap<>();
headers.put(KafkaConstants.PARTITION_KEY, 0);
headers.put(KafkaConstants.KEY, "1");
producerTemplate.sendBodyAndHeaders(DIRECT_KAFKA_START, testKafkaMessage, headers);
// Send with topicName in header
testKafkaMessage = "TOPIC " + testKafkaMessage;
headers.put(KafkaConstants.KEY, "2");
headers.put(KafkaConstants.TOPIC, "TestLog");
producerTemplate.sendBodyAndHeaders("direct:kafkaStartNoTopic", testKafkaMessage, headers);
testKafkaMessage = "PART 0 : " + testKafkaMessage;
Map<String, Object> newHeader = new HashMap<>();
newHeader.put(KafkaConstants.KEY, "AB"); // This should go to partition 0
producerTemplate.sendBodyAndHeaders(DIRECT_KAFKA_START_WITH_PARTITIONER, testKafkaMessage, newHeader);
testKafkaMessage = "PART 1 : " + testKafkaMessage;
newHeader.put(KafkaConstants.KEY, "ABC"); // This should go to partition 1
producerTemplate.sendBodyAndHeaders(DIRECT_KAFKA_START_WITH_PARTITIONER, testKafkaMessage, newHeader);
}
LOG.info("Successfully published event to Kafka.");
System.out.println("Enter text on the line below : [Press Ctrl-C to exit.] ");
Thread.sleep(5L * 60 * 1000);
}
}