in src/main/java/com/aws/logaggregator/connector/StorageStructuredStreamConnector.java [47:91]
public Dataset readLogData(SparkSession sparkSession, LogAppInitializer initBean) {
try {
LogAggregatorMetadata.Logaggregator.Source source = initBean.getMetdadataBean().getSource();
Dataset<Row> ds = null;
Map<String, String> engineOptions =
initBean.getMetdadataBean().getSource().getSparkoptions();
Map<String, String> ancillaryOptions =
initBean.getMetdadataBean().getSource().getConfigoptions();
String dataformat = ancillaryOptions.get("logformat");
String inputDir = ancillaryOptions.get("location");
String pattern = ancillaryOptions.get("pattern");
if (engineOptions != null && engineOptions.size() > 0) {
if (dataformat.equalsIgnoreCase("json") || dataformat.equalsIgnoreCase("xml")) {
ds = sparkSession.readStream().options(engineOptions).format("text").schema(initBean.schemaStruct).load(inputDir);
} else {
ds = sparkSession.readStream().options(engineOptions).format(dataformat).schema(initBean.schemaStruct).load(inputDir);
}
} else {
if (dataformat.equalsIgnoreCase("json") || dataformat.equalsIgnoreCase("xml")) {
ds = sparkSession.readStream().format("text").schema(initBean.schemaStruct).load(inputDir);
} else {
ds = sparkSession.readStream().options(engineOptions).format(dataformat).schema(initBean.schemaStruct).load(inputDir);
}
}
return ds;
} catch (Exception e) {
logger.error("Error thrown", e);
throw e;
}
}