public void process()

in src/main/java/com/aws/logaggregator/processor/stream/StreamLogProcessor.java [66:104]


    public void process(String type, ApplicationContext ctx) throws LogAggregatorException {

        logger.info("Data Processing Begin------->" + type);
        if (initializer == null || initializer.getMetdadataBean() == null) {
            logger.error("EXITING-->Metadata doesn't exist or the key--> " + type + "<--you have passed is missing in the metadata");
            return;
        }
//        if (initializer.schemaStruct == null) {
//            logger.error("SCHEMA IS NOT DEFINED FOR STREAM PROCESSING --> EXITING");
//            return;
//        }
        String sourceType = initializer.getMetdadataBean().getSource().getConfigoptions().get("sourcetype");
        if ("storage".equalsIgnoreCase(sourceType)) {
            sourceType = sourceType + "streaming";
        }
        String dataformat = initializer.getMetdadataBean().getSource().getConfigoptions()
                .get("logformat");
        String pattern = initializer.getMetdadataBean().getSource().getConfigoptions()
                .get("pattern");
        String flatten = initializer.getMetdadataBean().getSource().getConfigoptions()
                .get("flatten");
        String checkpointlocation = initializer.getMetdadataBean().getSource().getConfigoptions()
                .get("checkpointlocation");
        String decompress = initializer.getMetdadataBean().getSource().getConfigoptions()
                .get("decompress");
        BaseLogConnector sourceConnector = (BaseLogConnector) facBean.getBean(ctx, sourceType);
        Dataset ds = sourceConnector.readLogData(sparkSession, initializer);

//       Dataset finaldf  =  ds
//                //.filter((FilterFunction<Row>) row -> validator.validate(row, accumulator, initBean.partitionIndex))
//                .map(transformerBean, RowEncoder.apply(initializer.schemaStruct));
        try {
            writeOutput(ds, initializer, ctx, dataformat, decompress, pattern, sourceType, checkpointlocation);
        } catch (Exception e) {
            logger.error("Exception in Streaming-->", e);
        }


    }