public Dataset readLogData()

in src/main/java/com/aws/logaggregator/connector/StorageStructuredStreamConnector.java [47:91]


    public Dataset readLogData(SparkSession sparkSession, LogAppInitializer initBean) {
        try {


            LogAggregatorMetadata.Logaggregator.Source source = initBean.getMetdadataBean().getSource();

            Dataset<Row> ds = null;


            Map<String, String> engineOptions =
                    initBean.getMetdadataBean().getSource().getSparkoptions();

            Map<String, String> ancillaryOptions =
                    initBean.getMetdadataBean().getSource().getConfigoptions();

            String dataformat = ancillaryOptions.get("logformat");
            String inputDir = ancillaryOptions.get("location");
            String pattern = ancillaryOptions.get("pattern");
            if (engineOptions != null && engineOptions.size() > 0) {
                if (dataformat.equalsIgnoreCase("json") || dataformat.equalsIgnoreCase("xml")) {
                    ds = sparkSession.readStream().options(engineOptions).format("text").schema(initBean.schemaStruct).load(inputDir);

                } else {
                    ds = sparkSession.readStream().options(engineOptions).format(dataformat).schema(initBean.schemaStruct).load(inputDir);
                }
            } else {

                if (dataformat.equalsIgnoreCase("json") || dataformat.equalsIgnoreCase("xml")) {
                    ds = sparkSession.readStream().format("text").schema(initBean.schemaStruct).load(inputDir);

                } else {
                    ds = sparkSession.readStream().options(engineOptions).format(dataformat).schema(initBean.schemaStruct).load(inputDir);
                }
            }


            return ds;


        } catch (Exception e) {
            logger.error("Error thrown", e);
            throw e;
        }

    }