in slack/slack-source-avro-apicurio-schema-registry/kafka-avro-basic-consumer/src/main/java/org/apache/camel/kafkaconnector/SimpleConsumer.java [24:51]
public static void main(String[] args) throws JsonProcessingException {
final Logger LOG = LoggerFactory.getLogger(SimpleConsumer.class);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer",AvroKafkaDeserializer.class.getName());
props.put("apicurio.registry.url","http://localhost:8080/api");
props.put("group.id", UUID.randomUUID().toString());
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> cons = new KafkaConsumer<String, String>(props);
List<String> topics = new ArrayList<String>();
topics.add(args[0]);
cons.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = cons.poll(Duration.ofMillis(1000L));
if (consumerRecords.count() > 0) {
for (Iterator iterator = consumerRecords.iterator(); iterator.hasNext();) {
ConsumerRecord<String, Utf8> rec = (ConsumerRecord<String, Utf8>) iterator.next();
LOG.info(((Utf8) rec.value()).toString());
}
}
}
}