in src/main/java/com/aws/logaggregator/processor/batch/BatchLogProcessor.java [47:75]
public void process(String type, ApplicationContext ctx) throws LogAggregatorException {
logger.info("Data Processing Begin------->" + type);
if (initializer == null || initializer.getMetdadataBean() == null) {
logger.error("EXITING-->Metadata doesn't exist or the key--> " + type + "<--you have passed is missing in the metadata");
return;
}
String sourceType = initializer.getMetdadataBean().getSource().getConfigoptions().get("sourcetype");
BaseLogConnector sourceConnector = (BaseLogConnector) facBean.getBean(ctx, sourceType);
Dataset ds = sourceConnector.readLogData(sparkSession, initializer);
if (initializer.schemaStruct != null) {
} else {
ds = ds
// .filter((FilterFunction<Row>) row -> validator.validate(row, accumulator, initBean.partitionIndex))
.map(transformerBean, RowEncoder.apply(ds.schema()));
}
ds = transformerBean.parse(ds);
ds.persist(StorageLevel.MEMORY_AND_DISK());
List<BaseLogConnector> sinkConnectors = new ArrayList<BaseLogConnector>();
for (LogAggregatorMetadata.Logaggregator.Destination sink : initializer.getMetdadataBean().getDestination()) {
BaseLogConnector sinkConnector = (BaseLogConnector) facBean.getBean(ctx, sink.getConfigoptions().get("destinationtype"));
sinkConnectors.add(sinkConnector);
}
processSinks(ds, sinkConnectors);
ds.unpersist();
}