in src/main/java/com/aws/logaggregator/processor/stream/StreamLogProcessor.java [107:220]
protected void writeOutput(Dataset ds,
LogAppInitializer initializationBean, ApplicationContext ctx,
String dataformat, String decompress, String pattern, String flatten, String checkpointlocation) throws Exception {
sparkSession.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
}
@Override
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
}
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {
if (queryProgress.progress().numInputRows() > 0) {
//System.out.println("Query made progress and came here for processing rows: " + queryProgress.progress());
}
}
});
List<BaseLogConnector> sinkConnectors = new ArrayList<BaseLogConnector>();
for (LogAggregatorMetadata.Logaggregator.Destination destination : initializer.getMetdadataBean().getDestination()) {
BaseLogConnector sinkConnector = (BaseLogConnector) facBean.getBean(ctx, destination.getConfigoptions().get("destinationtype"));
sinkConnectors.add(sinkConnector);
}
ds.writeStream().foreachBatch(
new VoidFunction2<Dataset, Long>() {
@Override
public void call(Dataset fdsarg, Long v2) {
String finalstatus = "";
if ("true".equalsIgnoreCase(decompress)) {
fdsarg = fdsarg.withColumn("data",
org.apache.spark.sql.functions.callUDF("decompress", fdsarg.col("data")));
}
accumulator = sparkSession.sparkContext().collectionAccumulator();
try {
Dataset fds = null;
if ("xml".equalsIgnoreCase(dataformat)) {
fds = fdsarg.as(Encoders.STRING());
Dataset xmldf = new XmlReader().xmlRdd(fds.sqlContext(), fds.rdd());
if ("true".equalsIgnoreCase(flatten)) {
fds = flattendf(xmldf);
if (!fds.isEmpty()) {
fds = transformerBean.parse(fds);
}
} else {
if (!fds.isEmpty()) {
fds = transformerBean.parse(xmldf);
}
}
} else if ("json".equalsIgnoreCase(dataformat)) {
fds = fdsarg.as(Encoders.STRING());
if (fds.count() > 0 && !fds.isEmpty()) {
fds = sparkSession.read().json(fds);
fds = transformerBean.parse(fds);
}
} else {
fds = fdsarg;
if (pattern != null) {
fds = fds.flatMap(
new PatternBasedParserUtil(pattern), Encoders.bean(Row.class));
} else {
if ("text".equals(dataformat))
fds = fds.map(new SchemaBasedParserUtil(initializationBean.schema), Encoders.bean(Row.class));
}
if (initializer.schemaStruct != null) {
fds = fds.map(transformerBean, RowEncoder.apply(initializer.schemaStruct));
} else {
fds = fds.map(transformerBean, RowEncoder.apply(ds.schema()));
}
fds = transformerBean.parse(fds);
}
if (!fds.isEmpty() && fds.columns().length > 0) {
fds.persist(StorageLevel.MEMORY_AND_DISK());
fds.show();
processSinks(fds, sinkConnectors);
fds.unpersist();
}
} catch (Exception e) {
finalstatus = "EXCEPTION";
logger.error("process exception-->", e);
//throw e;
} finally {
postProcess(finalstatus);
}
}
}).option("checkpointLocation", checkpointlocation).start().awaitTermination();
// }
// return query;
}