in kogito-quarkus-examples/process-performance-client/src/main/java/org/kie/kogito/performance/client/RequestDispatcherRunner.java [79:108]
public Void call() throws Exception {
int consumedRequest = 0;
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
try (KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(),
new ByteArrayDeserializer());) {
long startDispatch = System.currentTimeMillis();
ThrowableConsumer errorCounter = new ThrowableConsumer();
for (int i = 0; i < numRequest; i++) {
dispatcher.dispatch(delay, errorCounter);
}
long endDispatch = System.currentTimeMillis();
kafkaConsumer.subscribe(Collections.singleton("done"));
while (numRequest > consumedRequest + errorCounter.errorCount()) {
logger.info("Consumed request: {}", consumedRequest);
ConsumerRecords<byte[], byte[]> events = kafkaConsumer.poll(Duration.ofSeconds(1));
events.forEach(this::collectTime);
consumedRequest += events.count();
}
if (errorCounter.errorCount() > 0) {
logger.error(errorCounter.toString());
}
logger.info("Time dispatching {}", endDispatch - startDispatch);
logger.info("Time from first finish to last finish {}", Duration.between(endTimes.first(), endTimes.last()));
}
return null;
}