public Dataset readLogData()

in src/main/java/com/aws/logaggregator/connector/StorageConnector.java [46:79]


    public Dataset readLogData(SparkSession sparkSession, LogAppInitializer initBean) {

        Map<String, String> sparkoptions =
                initBean.getMetdadataBean().getSource().getSparkoptions();

        Map<String, String> configoptions =
                initBean.getMetdadataBean().getSource().getConfigoptions();


        String inputDir = configoptions.get("location");
        String format = configoptions.get("logformat");


        Dataset ds = null;
        if (sparkoptions != null && sparkoptions.size() > 0) {
            ds = sparkSession.sqlContext().read().options(sparkoptions).format(format).load(inputDir);
        } else {
            ds = sparkSession.sqlContext().read().format(format).load(inputDir);
        }

        if ("text".equalsIgnoreCase(format)) {
            String pattern = configoptions.get("pattern");
            if (pattern != null) {
                ds = ds.flatMap(
                        new PatternBasedParserUtil(pattern), Encoders.bean(Row.class));
            } else {
                ds = ds.map(new SchemaBasedParserUtil(initBean.schema), Encoders.bean(Row.class));
            }
        }

        return ds;


    }