public void startProcessing()

in src/main/java/com/aws/logaggregator/LogAggregatorMainApplication.java [58:102]


    public void startProcessing(String[] args) throws Exception {
        logger.info("Loading Log Aggregator Application Context Begin");

        ApplicationContext ctx =
                new AnnotationConfigApplicationContext(LogAggregatorMainApplication.class);

        logger.info("Loading Log Aggregator Application Context End");
        if (args.length < 4) {
            logger.error("Please pass Region name, table name, Key and the source type (stream or batch)");
            System.exit(0);
        }
        String configParamValues = "";
        if (args.length > 4) {
            configParamValues = args[4];
        }
        try {
            sparkSession = (SparkSession) new BaseBeanFactory().getBean(ctx, "spark-session");
            Set<Callable<String>> callables = new HashSet<>();
            final String region = args[0];
            final String path = args[1];
            final String key = args[2];
            final String sourceType = args[3];
            BaseLogProcessor logProcessorBean = (BaseLogProcessor) new BaseBeanFactory().getBean(ctx, sourceType);

            if (logProcessorBean == null) {
                return;
            }
            try {

                logProcessorBean.initializeEnvironment(path, key, region, configParamValues, sparkSession, ctx);
                logProcessorBean.process(key, ctx);
            } catch (Exception e) {
                logger.error("Exception thrown inside callable", e);
            }
            logger.info("Ending processing of type--> " + args[2]);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("Exception thrown", e);

        } finally {
            logger.info("Closing spark session-->");
            sparkSession.close();
            System.exit(0);
        }
    }