public void process()

in src/main/java/com/aws/logaggregator/processor/batch/BatchLogProcessor.java [47:75]


    public void process(String type, ApplicationContext ctx) throws LogAggregatorException {
        logger.info("Data Processing Begin------->" + type);
        if (initializer == null || initializer.getMetdadataBean() == null) {
            logger.error("EXITING-->Metadata doesn't exist or the key--> " + type + "<--you have passed is missing in the metadata");
            return;
        }
        String sourceType = initializer.getMetdadataBean().getSource().getConfigoptions().get("sourcetype");
        BaseLogConnector sourceConnector = (BaseLogConnector) facBean.getBean(ctx, sourceType);
        Dataset ds = sourceConnector.readLogData(sparkSession, initializer);

        if (initializer.schemaStruct != null) {

        } else {
            ds = ds
                    // .filter((FilterFunction<Row>) row -> validator.validate(row, accumulator, initBean.partitionIndex))
                    .map(transformerBean, RowEncoder.apply(ds.schema()));
        }
        ds = transformerBean.parse(ds);
        ds.persist(StorageLevel.MEMORY_AND_DISK());


        List<BaseLogConnector> sinkConnectors = new ArrayList<BaseLogConnector>();
        for (LogAggregatorMetadata.Logaggregator.Destination sink : initializer.getMetdadataBean().getDestination()) {
            BaseLogConnector sinkConnector = (BaseLogConnector) facBean.getBean(ctx, sink.getConfigoptions().get("destinationtype"));
            sinkConnectors.add(sinkConnector);
        }
        processSinks(ds, sinkConnectors);
        ds.unpersist();
    }