in jbang/kafka-apicurio-secured-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java [44:84]
public static void main(String[] args) throws IOException {
String propertiesPath = DEFAULT_PROPERTIES_PATH;
if (args.length >= 1) {
propertiesPath = args[0];
}
Properties properties = new Properties();
properties.load(Files.newInputStream(Paths.get(propertiesPath)));
String registryUrl = properties.getProperty("schema.registry.url");
String authServiceUrl = properties.getProperty("keycloak.service.url");
String authRealm = properties.getProperty("keycloak.realm");
String clientId = properties.getProperty("keycloak.client.id");
String clientSecret = properties.getProperty("keycloak.client.secret");
String apicurioUsername = properties.getProperty("keycloak.apicurio.username");
String apicurioPassword = properties.getProperty("keycloak.apicurio.password");
properties.put(SerdeConfig.REGISTRY_URL, registryUrl);
properties.put(SerdeConfig.AUTH_SERVICE_URL, authServiceUrl);
properties.put(SerdeConfig.AUTH_REALM, authRealm);
properties.put(SerdeConfig.AUTH_CLIENT_ID, clientId);
properties.put(SerdeConfig.AUTH_USERNAME, apicurioUsername);
properties.put(SerdeConfig.AUTH_PASSWORD, apicurioPassword);
properties.put(SerdeConfig.AUTH_CLIENT_SECRET, clientSecret);
properties.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class);
properties.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName());
try (KafkaProducer<String, Order> orderProducer = new KafkaProducer<>(properties)) {
Order order = new Order(1, "item", "user", 3.0, "A really nice item I do love");
String topic = properties.getProperty("topic");
ProducerRecord<String, Order> record = new ProducerRecord<>(topic, "key", order);
RecordMetadata result = orderProducer.send(record).get(5, TimeUnit.SECONDS);
System.out.println("Sent record with offset " + result.offset());
} catch (ExecutionException | InterruptedException | TimeoutException | RestClientException e ) {
e.printStackTrace();
}
}