in Sample-Code-Snippets/Java/Spring-Boot-Kafka/eventhubs-client/src/main/java/com/Azure/Testing/EventHubKafkaClient/EventHubKafkaClientApplication.java [65:120]
public void run(String... args) throws Exception {
String saslJassConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" password=\"" + eventHubsConnectionString + "\";";
// Producer
final Properties producerProperties = new Properties() {{
put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
put(SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
put(SASL_MECHANISM, "PLAIN");
put(SASL_JAAS_CONFIG, saslJassConfig);
put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
}};
try (final Producer<String, String> producer = new KafkaProducer<>(producerProperties)) {
final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
String key = "Key: " + i;
String value = "Value: " + i;
producer.send(
new ProducerRecord<>(eventhubName, key, value),
(event, ex) -> {
if (ex != null)
ex.printStackTrace();
else
System.out.printf("Produced event to topic %s [key: %s value: %s] %n", eventhubName, key, value);
});
}
System.out.printf("%s events were produced to topic %s%n", numMessages, eventhubName);
}
TimeUnit.SECONDS.sleep(2);
// Consumer
final Properties consumerProperties = new Properties() {{
put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
put(SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
put(SASL_MECHANISM, "PLAIN");
put(SASL_JAAS_CONFIG, saslJassConfig);
put(GROUP_ID_CONFIG, consumerGroupId);
put(AUTO_OFFSET_RESET_CONFIG, "earliest");
put(ENABLE_AUTO_COMMIT_CONFIG, true);
put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
}};
try (final Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singletonList(eventhubName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed event from topic %s: key = %-10s value = %s%n", eventhubName, record.key(), record.value());
}
}
}
}