in src/main/java/com/aws/logaggregator/connector/KafkaConnector.java [54:124]
public Dataset readLogData(SparkSession sparkSession, LogAppInitializer initBean) {
try {
LogAggregatorMetadata.Logaggregator.Source source = initBean.getMetdadataBean().getSource();
Dataset<Row> ds = null;
if ("true".equalsIgnoreCase(source.getConfigoptions().get("decompress"))) {
UDF1<byte[], String> decompress = new UDF1<byte[], String>() {
@Override
public String call(byte[] compressed) throws Exception {
final StringBuilder outStr = new StringBuilder();
final ByteArrayInputStream ba = new ByteArrayInputStream(compressed);
final GZIPInputStream gis = new GZIPInputStream(ba);
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gis, StandardCharsets.UTF_8));
String line;
while ((line = bufferedReader.readLine()) != null) {
outStr.append(line);
}
ba.close();
gis.close();
bufferedReader.close();
return outStr.toString();
}
};
sparkSession.udf().register("decompress", decompress, DataTypes.StringType);
}
if ("json".equalsIgnoreCase(source.getConfigoptions().get("logformat"))) {
ds = sparkSession
.readStream()
.format("kafka")
.options(source.getSparkoptions())
.load()
//.select(org.apache.spark.sql.functions.from_json(org.apache.spark.sql.functions.col("data"), initBean.schemaStruct).as("data"))
.select("data");
} else if ("text".equalsIgnoreCase(source.getConfigoptions().get("logformat"))) {
ds = sparkSession
.readStream()
.format("kafka")
.options(source.getSparkoptions())
.load().selectExpr("CAST(data AS STRING) as data")
//.select(org.apache.spark.sql.functions.from_json(org.apache.spark.sql.functions.col("data"), initBean.schemaStruct).as("data"))
.select("data");
} else if ("xml".equalsIgnoreCase(source.getConfigoptions().get("logformat"))) {
ds = sparkSession
.readStream()
.format("kafka")
.options(source.getSparkoptions())
.load().selectExpr("CAST(data AS STRING) as data")
//.select(org.apache.spark.sql.functions.from_json(org.apache.spark.sql.functions.col("data"), initBean.schemaStruct).as("data"))
.select("data");
}
return ds;
} catch (Exception e) {
//TODO need to decide on what actions to be taken
logger.error("Error thrown", e);
//throw e;
}
return null;
}