public void run()

in Sample-Code-Snippets/Java/Spring-Boot-Kafka/eventhubs-client/src/main/java/com/Azure/Testing/EventHubKafkaClient/EventHubKafkaClientApplication.java [65:120]


	public void run(String... args) throws Exception {
		String saslJassConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
								"username=\"" + username + "\" password=\"" + eventHubsConnectionString + "\";";

		// Producer
		final Properties producerProperties = new Properties() {{
			put(BOOTSTRAP_SERVERS_CONFIG,      kafkaBootstrapServers);
			put(SECURITY_PROTOCOL_CONFIG,      "SASL_PLAINTEXT");
			put(SASL_MECHANISM,                "PLAIN");
			put(SASL_JAAS_CONFIG,              saslJassConfig);
			put(KEY_SERIALIZER_CLASS_CONFIG,   StringSerializer.class.getCanonicalName());
			put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
		}};
		try (final Producer<String, String> producer = new KafkaProducer<>(producerProperties)) {
			final int numMessages = 100;
			for (int i = 0; i < numMessages; i++) {
				String key = "Key: " + i;
				String value = "Value: " + i;
				producer.send(
						new ProducerRecord<>(eventhubName, key, value),
						(event, ex) -> {
							if (ex != null)
								ex.printStackTrace();
							else
								System.out.printf("Produced event to topic %s [key: %s value: %s] %n", eventhubName, key, value);
						});
			}
			System.out.printf("%s events were produced to topic %s%n", numMessages, eventhubName);
		}

		TimeUnit.SECONDS.sleep(2);


		// Consumer
		final Properties consumerProperties = new Properties() {{
			put(BOOTSTRAP_SERVERS_CONFIG,        kafkaBootstrapServers);
			put(SECURITY_PROTOCOL_CONFIG,        "SASL_PLAINTEXT");
			put(SASL_MECHANISM,                  "PLAIN");
			put(SASL_JAAS_CONFIG,                saslJassConfig);
			put(GROUP_ID_CONFIG,                 consumerGroupId);
			put(AUTO_OFFSET_RESET_CONFIG,        "earliest");
			put(ENABLE_AUTO_COMMIT_CONFIG,       true);
			put(KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getCanonicalName());
			put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
		}};

		try (final Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
			consumer.subscribe(Collections.singletonList(eventhubName));
			while (true) {
				ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
				for (ConsumerRecord<String, String> record : records) {
					System.out.printf("Consumed event from topic %s: key = %-10s value = %s%n", eventhubName, record.key(), record.value());
				}
			}
		}
	}