in jbang/kafka-apicurio-secured-json-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java [43:91]
public static void main(String[] args) throws IOException {
String propertiesPath = DEFAULT_PROPERTIES_PATH;
if (args.length >= 1) {
propertiesPath = args[0];
}
Properties properties = new Properties();
properties.load(Files.newInputStream(Paths.get(propertiesPath)));
String registryUrl = properties.getProperty("schema.registry.url");
String authServiceUrl = properties.getProperty("keycloak.service.url");
String topicName = properties.getProperty("topic");
String authRealm = properties.getProperty("keycloak.realm");
String clientId = properties.getProperty("keycloak.client.id");
String clientSecret = properties.getProperty("keycloak.client.secret");
String apicurioUsername = properties.getProperty("keycloak.apicurio.username");
String apicurioPassword = properties.getProperty("keycloak.apicurio.password");
properties.put(SerdeConfig.REGISTRY_URL, registryUrl);
properties.put(SerdeConfig.AUTH_SERVICE_URL, authServiceUrl);
properties.put(SerdeConfig.AUTH_REALM, authRealm);
properties.put(SerdeConfig.AUTH_CLIENT_ID, clientId);
properties.put(SerdeConfig.AUTH_USERNAME, apicurioUsername);
properties.put(SerdeConfig.AUTH_PASSWORD, apicurioPassword);
properties.put(SerdeConfig.AUTH_CLIENT_SECRET, clientSecret);
properties.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class);
properties.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.TRUE);
try (KafkaProducer<Object, Object> orderProducer = new KafkaProducer<>(properties)) {
String topic = topicName;
Product prod = new Product();
prod.setId(1);
prod.setName("product");
ProducerRecord<Object, Object> record = new ProducerRecord<>(topic, "test", prod);
WrongProduct prod1 = new WrongProduct();
prod1.setElement(1);
prod1.setText("product-wrong");
prod1.setDescription("Hello");
ProducerRecord<Object, Object> record1 = new ProducerRecord<>(topic, "test", prod1);
RecordMetadata result = orderProducer.send(record).get(5, TimeUnit.SECONDS);
System.out.println("Sent record with offset " + result.offset());
result = orderProducer.send(record1).get(5, TimeUnit.SECONDS);
System.out.println("Sent record with offset " + result.offset());
} catch (ExecutionException | InterruptedException | TimeoutException | RestClientException e ) {
e.printStackTrace();
}
}