in src/main/java/com/aws/logaggregator/processor/BaseLogProcessor.java [165:206]
protected void processSinks(Dataset newDataset, List<BaseLogConnector> connectors) {
StringBuffer sinkprocessingStatus = new StringBuffer();
if (connectors.size() == 0) {
return;
}
ExecutorService executorService = Executors.newFixedThreadPool(connectors.size());
try {
Set<Callable<String>> callables = new HashSet<>();
for (BaseLogConnector hook : connectors) {
callables.add(new Callable<String>() {
public String call() {
boolean sinkstatus = true;
try {
hook.writeLogData(newDataset, sparkSession, initializer);
} catch (Exception exception) {
logger.error("Exception thrown from SInk-->" + exception, exception);
sinkstatus = false;
}
return hook.getConnectorType() + ":" + hook.getClass().getName() + ":" + sinkstatus;
}
});
}
List<Future<String>> futures = executorService.invokeAll(callables);
for (Future<String> future : futures) {
String status = future.get();
sinkprocessingStatus.append(status);
sinkprocessingStatus.append(",");
logger.info("Processing Sink completed-->" + status);
}
setProcessingHookStatus(sinkprocessingStatus.toString());
} catch (Exception e) {
logger.error("Error processing hooks", e);
} finally {
executorService.shutdown();
}
}