in src/main/java/com/aws/logaggregator/processor/stream/StreamLogProcessor.java [66:104]
public void process(String type, ApplicationContext ctx) throws LogAggregatorException {
logger.info("Data Processing Begin------->" + type);
if (initializer == null || initializer.getMetdadataBean() == null) {
logger.error("EXITING-->Metadata doesn't exist or the key--> " + type + "<--you have passed is missing in the metadata");
return;
}
// if (initializer.schemaStruct == null) {
// logger.error("SCHEMA IS NOT DEFINED FOR STREAM PROCESSING --> EXITING");
// return;
// }
String sourceType = initializer.getMetdadataBean().getSource().getConfigoptions().get("sourcetype");
if ("storage".equalsIgnoreCase(sourceType)) {
sourceType = sourceType + "streaming";
}
String dataformat = initializer.getMetdadataBean().getSource().getConfigoptions()
.get("logformat");
String pattern = initializer.getMetdadataBean().getSource().getConfigoptions()
.get("pattern");
String flatten = initializer.getMetdadataBean().getSource().getConfigoptions()
.get("flatten");
String checkpointlocation = initializer.getMetdadataBean().getSource().getConfigoptions()
.get("checkpointlocation");
String decompress = initializer.getMetdadataBean().getSource().getConfigoptions()
.get("decompress");
BaseLogConnector sourceConnector = (BaseLogConnector) facBean.getBean(ctx, sourceType);
Dataset ds = sourceConnector.readLogData(sparkSession, initializer);
// Dataset finaldf = ds
// //.filter((FilterFunction<Row>) row -> validator.validate(row, accumulator, initBean.partitionIndex))
// .map(transformerBean, RowEncoder.apply(initializer.schemaStruct));
try {
writeOutput(ds, initializer, ctx, dataformat, decompress, pattern, sourceType, checkpointlocation);
} catch (Exception e) {
logger.error("Exception in Streaming-->", e);
}
}