public Dataset readLogData()

in src/main/java/com/aws/logaggregator/connector/KinesisConnector.java [52:123]


    public Dataset readLogData(SparkSession session, LogAppInitializer initBean) {
        try {

            LogAggregatorMetadata.Logaggregator.Source source = initBean.getMetdadataBean().getSource();

            Dataset<Row> ds = null;

            if ("true".equalsIgnoreCase(source.getConfigoptions().get("decompress"))) {
                UDF1<byte[], String> decompress = new UDF1<byte[], String>() {
                    @Override
                    public String call(byte[] compressed) throws Exception {
                        final StringBuilder outStr = new StringBuilder();
                        final ByteArrayInputStream ba = new ByteArrayInputStream(compressed);
                        final GZIPInputStream gis = new GZIPInputStream(ba);
                        final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gis, StandardCharsets.UTF_8));

                        String line;
                        while ((line = bufferedReader.readLine()) != null) {
                            outStr.append(line);
                        }
                        ba.close();
                        gis.close();
                        bufferedReader.close();
                        return outStr.toString();
                    }
                };

                session.udf().register("decompress", decompress, DataTypes.StringType);
            }


            if ("json".equalsIgnoreCase(source.getConfigoptions().get("logformat"))) {


                ds = session
                        .readStream()
                        .format("kinesis")
                        .options(source.getSparkoptions())
                        .load()
                        // .select(org.apache.spark.sql.functions.from_json(org.apache.spark.sql.functions.col("data"), initBean.schemaStruct).as("data"))
                        .select("data");

            } else if ("text".equalsIgnoreCase(source.getConfigoptions().get("logformat"))) {

                ds = session
                        .readStream()
                        .format("kinesis")
                        .options(source.getSparkoptions())
                        .load().selectExpr("CAST(data AS STRING) as data")
                        .select("data");


            } else if ("xml".equalsIgnoreCase(source.getConfigoptions().get("logformat"))) {

                ds = session
                        .readStream()
                        .format("kinesis")
                        .options(source.getSparkoptions())
                        .load().selectExpr("CAST(data AS STRING) as data")
                        .select("data");


            }
            return ds;

        } catch (Exception e) {
            //TODO need to decide on what actions to be taken
            logger.error("Error thrown", e);
            //throw e;
        }
        return null;
    }