in src/main/java/com/aws/logaggregator/LogAggregatorMainApplication.java [58:102]
public void startProcessing(String[] args) throws Exception {
logger.info("Loading Log Aggregator Application Context Begin");
ApplicationContext ctx =
new AnnotationConfigApplicationContext(LogAggregatorMainApplication.class);
logger.info("Loading Log Aggregator Application Context End");
if (args.length < 4) {
logger.error("Please pass Region name, table name, Key and the source type (stream or batch)");
System.exit(0);
}
String configParamValues = "";
if (args.length > 4) {
configParamValues = args[4];
}
try {
sparkSession = (SparkSession) new BaseBeanFactory().getBean(ctx, "spark-session");
Set<Callable<String>> callables = new HashSet<>();
final String region = args[0];
final String path = args[1];
final String key = args[2];
final String sourceType = args[3];
BaseLogProcessor logProcessorBean = (BaseLogProcessor) new BaseBeanFactory().getBean(ctx, sourceType);
if (logProcessorBean == null) {
return;
}
try {
logProcessorBean.initializeEnvironment(path, key, region, configParamValues, sparkSession, ctx);
logProcessorBean.process(key, ctx);
} catch (Exception e) {
logger.error("Exception thrown inside callable", e);
}
logger.info("Ending processing of type--> " + args[2]);
} catch (Exception e) {
e.printStackTrace();
logger.error("Exception thrown", e);
} finally {
logger.info("Closing spark session-->");
sparkSession.close();
System.exit(0);
}
}