src/main/java/com/aws/logaggregator/connector/KafkaConnector.java [55:79]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        try {


            LogAggregatorMetadata.Logaggregator.Source source = initBean.getMetdadataBean().getSource();

            Dataset<Row> ds = null;
            if ("true".equalsIgnoreCase(source.getConfigoptions().get("decompress"))) {
                UDF1<byte[], String> decompress = new UDF1<byte[], String>() {
                    @Override
                    public String call(byte[] compressed) throws Exception {
                        final StringBuilder outStr = new StringBuilder();
                        final ByteArrayInputStream ba = new ByteArrayInputStream(compressed);
                        final GZIPInputStream gis = new GZIPInputStream(ba);
                        final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gis, StandardCharsets.UTF_8));

                        String line;
                        while ((line = bufferedReader.readLine()) != null) {
                            outStr.append(line);
                        }
                        ba.close();
                        gis.close();
                        bufferedReader.close();
                        return outStr.toString();
                    }
                };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/main/java/com/aws/logaggregator/connector/KinesisConnector.java [53:77]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        try {

            LogAggregatorMetadata.Logaggregator.Source source = initBean.getMetdadataBean().getSource();

            Dataset<Row> ds = null;

            if ("true".equalsIgnoreCase(source.getConfigoptions().get("decompress"))) {
                UDF1<byte[], String> decompress = new UDF1<byte[], String>() {
                    @Override
                    public String call(byte[] compressed) throws Exception {
                        final StringBuilder outStr = new StringBuilder();
                        final ByteArrayInputStream ba = new ByteArrayInputStream(compressed);
                        final GZIPInputStream gis = new GZIPInputStream(ba);
                        final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gis, StandardCharsets.UTF_8));

                        String line;
                        while ((line = bufferedReader.readLine()) != null) {
                            outStr.append(line);
                        }
                        ba.close();
                        gis.close();
                        bufferedReader.close();
                        return outStr.toString();
                    }
                };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



