in src/main/java/com/aws/logaggregator/connector/StorageConnector.java [46:79]
public Dataset readLogData(SparkSession sparkSession, LogAppInitializer initBean) {
Map<String, String> sparkoptions =
initBean.getMetdadataBean().getSource().getSparkoptions();
Map<String, String> configoptions =
initBean.getMetdadataBean().getSource().getConfigoptions();
String inputDir = configoptions.get("location");
String format = configoptions.get("logformat");
Dataset ds = null;
if (sparkoptions != null && sparkoptions.size() > 0) {
ds = sparkSession.sqlContext().read().options(sparkoptions).format(format).load(inputDir);
} else {
ds = sparkSession.sqlContext().read().format(format).load(inputDir);
}
if ("text".equalsIgnoreCase(format)) {
String pattern = configoptions.get("pattern");
if (pattern != null) {
ds = ds.flatMap(
new PatternBasedParserUtil(pattern), Encoders.bean(Row.class));
} else {
ds = ds.map(new SchemaBasedParserUtil(initBean.schema), Encoders.bean(Row.class));
}
}
return ds;
}