in jbang/azure-eventhubs-kafka-azure-schema-registry/azure-identity/src/main/java/com/acme/example/eventhubs/Produce.java [44:80]
public static void main(String[] args) throws IOException {
String propertiesPath = DEFAULT_PROPERTIES_PATH;
if (args.length >= 1) {
propertiesPath = args[0];
}
Properties properties = new Properties();
properties.load(Files.newInputStream(Paths.get(propertiesPath)));
TokenCredential credential = new DefaultAzureCredentialWrapper();
String password = properties.getProperty("connectionstring");
String saslJaas = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"$ConnectionString\"" +
"password=" + password + ";";
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaas);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(KafkaAvroSerializerConfig.SCHEMA_GROUP_CONFIG, "avro");
properties.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS_CONFIG, true);
properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_CREDENTIAL_CONFIG, credential);
try (KafkaProducer<String, Order> orderProducer = new KafkaProducer<>(properties)) {
Order order = new Order(1, "item", "user", 3.0);
String topic = properties.getProperty("topic");
ProducerRecord<String, Order> record = new ProducerRecord<>(topic, "key", order);
RecordMetadata result = orderProducer.send(record).get(5, TimeUnit.SECONDS);
System.out.println("Sent record with offset " + result.offset());
} catch (ExecutionException | InterruptedException | TimeoutException e) {
e.printStackTrace();
System.exit(1);
}
}