in data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java [151:188]
private void runKafkaConsumer() {
final Consumer<String, DataTransferEvent> mftEventConsumer = createConsumer();
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
while (true) {
ConsumerRecords<String, DataTransferEvent> consumerRecords = mftEventConsumer.poll(Duration.ofMillis(1000));
if (!consumerRecords.isEmpty()) {
CompletionService<Boolean> executorCompletionService = new ExecutorCompletionService<>(kafkaMessageProcessPool);
List<Future<Boolean>> processingFutures = new ArrayList<>();
for (TopicPartition partition : consumerRecords.partitions()) {
List<ConsumerRecord<String, DataTransferEvent>> partitionRecords = consumerRecords.records(partition);
for (ConsumerRecord<String, DataTransferEvent> record : partitionRecords) {
processingFutures.add(executorCompletionService.submit(() -> {
boolean success = processCallbackMessage(record.value());
logger.info("Processing DTE for task " + record.value().getTaskId() + " : " + success);
return success;
}));
mftEventConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
}
}
for (Future<Boolean> f : processingFutures) {
try {
executorCompletionService.take().get();
} catch (Exception e) {
logger.error("Failed processing DTE", e);
}
}
logger.info("All messages processed. Moving to next round");
}
}
});
}