in core/src/main/java/org/apache/sdap/mudrod/weblog/structure/session/SessionExtractor.java [76:119]
private JavaRDD<ClickStream> getClickStreamListInParallel(Properties props, SparkDriver spark, ESDriver es) {
List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
LOG.info("Retrieved {}", logIndexList.toString());
List<String> sessionIdList = new ArrayList<>();
for (String logIndex : logIndexList) {
List<String> tmpsessionList = this.getSessions(props, es, logIndex);
sessionIdList.addAll(tmpsessionList);
}
JavaRDD<String> sessionRDD = spark.sc.parallelize(sessionIdList, 16);
JavaRDD<ClickStream> clickStreamRDD = sessionRDD.mapPartitions(
new FlatMapFunction<Iterator<String>, ClickStream>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Iterator<ClickStream> call(Iterator<String> arg0) throws Exception {
ESDriver tmpES = new ESDriver(props);
tmpES.createBulkProcessor();
Session session = new Session(props, tmpES);
List<ClickStream> clickstreams = new ArrayList<>();
while (arg0.hasNext()) {
String s = arg0.next();
String[] sArr = s.split(",");
List<ClickStream> clicks = session.getClickStreamList(sArr[1], sArr[2], sArr[0]);
clickstreams.addAll(clicks);
}
tmpES.destroyBulkProcessor();
tmpES.close();
return clickstreams.iterator();
}
});
LOG.info("Clickstream number: {}", clickStreamRDD.count());
return clickStreamRDD;
}