in jbang/kafka-apicurio-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java [43:69]
public static void main(String[] args) throws IOException {
String propertiesPath = DEFAULT_PROPERTIES_PATH;
if (args.length >= 1) {
propertiesPath = args[0];
}
Properties properties = new Properties();
properties.load(Files.newInputStream(Paths.get(propertiesPath)));
properties.put(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
properties.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class);
properties.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
try (KafkaProducer<String, Order> orderProducer = new KafkaProducer<>(properties)) {
Order order = new Order(1, "item", "user", 3.0, "A really nice item I do love");
String topic = properties.getProperty("topic");
ProducerRecord<String, Order> record = new ProducerRecord<>(topic, "key", order);
RecordMetadata result = orderProducer.send(record).get(5, TimeUnit.SECONDS);
System.out.println("Sent record with offset " + result.offset());
} catch (ExecutionException | InterruptedException | TimeoutException e) {
e.printStackTrace();
System.exit(1);
}
}