protected void writeOutput()

in src/main/java/com/aws/logaggregator/processor/stream/StreamLogProcessor.java [107:220]


    protected void writeOutput(Dataset ds,
                               LogAppInitializer initializationBean, ApplicationContext ctx,
                               String dataformat, String decompress, String pattern, String flatten, String checkpointlocation) throws Exception {
        sparkSession.streams().addListener(new StreamingQueryListener() {
            @Override
            public void onQueryStarted(QueryStartedEvent queryStarted) {
                System.out.println("Query started: " + queryStarted.id());
            }

            @Override
            public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
                System.out.println("Query terminated: " + queryTerminated.id());
            }

            @Override
            public void onQueryProgress(QueryProgressEvent queryProgress) {

                if (queryProgress.progress().numInputRows() > 0) {
                    //System.out.println("Query made progress and came here for processing rows: " + queryProgress.progress());
                }

            }
        });

        List<BaseLogConnector> sinkConnectors = new ArrayList<BaseLogConnector>();
        for (LogAggregatorMetadata.Logaggregator.Destination destination : initializer.getMetdadataBean().getDestination()) {
            BaseLogConnector sinkConnector = (BaseLogConnector) facBean.getBean(ctx, destination.getConfigoptions().get("destinationtype"));
            sinkConnectors.add(sinkConnector);
        }


        ds.writeStream().foreachBatch(
                new VoidFunction2<Dataset, Long>() {
                    @Override
                    public void call(Dataset fdsarg, Long v2) {
                        String finalstatus = "";
                        if ("true".equalsIgnoreCase(decompress)) {
                            fdsarg = fdsarg.withColumn("data",
                                    org.apache.spark.sql.functions.callUDF("decompress", fdsarg.col("data")));

                        }
                        accumulator = sparkSession.sparkContext().collectionAccumulator();
                        try {
                            Dataset fds = null;
                            if ("xml".equalsIgnoreCase(dataformat)) {
                                fds = fdsarg.as(Encoders.STRING());
                                Dataset xmldf = new XmlReader().xmlRdd(fds.sqlContext(), fds.rdd());
                                if ("true".equalsIgnoreCase(flatten)) {


                                    fds = flattendf(xmldf);
                                    if (!fds.isEmpty()) {
                                        fds = transformerBean.parse(fds);
                                    }

                                } else {
                                    if (!fds.isEmpty()) {
                                        fds = transformerBean.parse(xmldf);
                                    }
                                }


                            } else if ("json".equalsIgnoreCase(dataformat)) {
                                fds = fdsarg.as(Encoders.STRING());

                                if (fds.count() > 0 && !fds.isEmpty()) {
                                    fds = sparkSession.read().json(fds);
                                    fds = transformerBean.parse(fds);


                                }

                            } else {

                                fds = fdsarg;
                                if (pattern != null) {
                                    fds = fds.flatMap(
                                            new PatternBasedParserUtil(pattern), Encoders.bean(Row.class));
                                } else {
                                    if ("text".equals(dataformat))
                                        fds = fds.map(new SchemaBasedParserUtil(initializationBean.schema), Encoders.bean(Row.class));
                                }
                                if (initializer.schemaStruct != null) {
                                    fds = fds.map(transformerBean, RowEncoder.apply(initializer.schemaStruct));
                                } else {
                                    fds = fds.map(transformerBean, RowEncoder.apply(ds.schema()));
                                }
                                fds = transformerBean.parse(fds);
                            }


                            if (!fds.isEmpty() && fds.columns().length > 0) {
                                fds.persist(StorageLevel.MEMORY_AND_DISK());
                                fds.show();
                                processSinks(fds, sinkConnectors);
                                fds.unpersist();
                            }
                        } catch (Exception e) {
                            finalstatus = "EXCEPTION";
                            logger.error("process exception-->", e);
                            //throw e;

                        } finally {
                            postProcess(finalstatus);

                        }

                    }
                }).option("checkpointLocation", checkpointlocation).start().awaitTermination();


        //  }
        // return query;
    }