protected void processSinks()

in src/main/java/com/aws/logaggregator/processor/BaseLogProcessor.java [165:206]


    protected void processSinks(Dataset newDataset, List<BaseLogConnector> connectors) {
        StringBuffer sinkprocessingStatus = new StringBuffer();
        if (connectors.size() == 0) {
            return;
        }
        ExecutorService executorService = Executors.newFixedThreadPool(connectors.size());
        try {

            Set<Callable<String>> callables = new HashSet<>();

            for (BaseLogConnector hook : connectors) {
                callables.add(new Callable<String>() {
                    public String call() {
                        boolean sinkstatus = true;
                        try {
                            hook.writeLogData(newDataset, sparkSession, initializer);
                        } catch (Exception exception) {
                            logger.error("Exception thrown from SInk-->" + exception, exception);
                            sinkstatus = false;
                        }

                        return hook.getConnectorType() + ":" + hook.getClass().getName() + ":" + sinkstatus;
                    }
                });
            }

            List<Future<String>> futures = executorService.invokeAll(callables);
            for (Future<String> future : futures) {
                String status = future.get();
                sinkprocessingStatus.append(status);
                sinkprocessingStatus.append(",");
                logger.info("Processing Sink completed-->" + status);
            }
            setProcessingHookStatus(sinkprocessingStatus.toString());
        } catch (Exception e) {

            logger.error("Error processing hooks", e);
        } finally {
            executorService.shutdown();
        }

    }