private void runKafkaConsumer()

in data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java [151:188]


    private void runKafkaConsumer() {

        final Consumer<String, DataTransferEvent> mftEventConsumer = createConsumer();
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> {
            while (true) {
                ConsumerRecords<String, DataTransferEvent> consumerRecords = mftEventConsumer.poll(Duration.ofMillis(1000));
                if (!consumerRecords.isEmpty()) {

                    CompletionService<Boolean> executorCompletionService = new ExecutorCompletionService<>(kafkaMessageProcessPool);

                    List<Future<Boolean>> processingFutures = new ArrayList<>();

                    for (TopicPartition partition : consumerRecords.partitions()) {
                        List<ConsumerRecord<String, DataTransferEvent>> partitionRecords = consumerRecords.records(partition);
                        for (ConsumerRecord<String, DataTransferEvent> record : partitionRecords) {
                            processingFutures.add(executorCompletionService.submit(() -> {
                                boolean success = processCallbackMessage(record.value());
                                logger.info("Processing DTE for task " + record.value().getTaskId() + " : " + success);
                                return success;
                            }));

                            mftEventConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
                        }
                    }

                    for (Future<Boolean> f : processingFutures) {
                        try {
                            executorCompletionService.take().get();
                        } catch (Exception e) {
                            logger.error("Failed processing DTE", e);
                        }
                    }
                    logger.info("All messages processed. Moving to next round");
                }
            }
        });
    }