in core/src/main/java/org/apache/sdap/mudrod/weblog/structure/session/SessionExtractor.java [439:482]
public JavaRDD<RankingTrainData> extractRankingTrainDataInParallel(Properties props, SparkDriver spark, ESDriver es) {
List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
LOG.info(logIndexList.toString());
List<String> sessionIdList = new ArrayList<>();
for (String logIndex : logIndexList) {
List<String> tmpsessionList = this.getSessions(props, es, logIndex);
sessionIdList.addAll(tmpsessionList);
}
JavaRDD<String> sessionRDD = spark.sc.parallelize(sessionIdList, 16);
JavaRDD<RankingTrainData> clickStreamRDD = sessionRDD.mapPartitions(
new FlatMapFunction<Iterator<String>, RankingTrainData>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Iterator<RankingTrainData> call(Iterator<String> arg0) throws Exception {
ESDriver tmpES = new ESDriver(props);
tmpES.createBulkProcessor();
Session session = new Session(props, tmpES);
List<RankingTrainData> clickstreams = new ArrayList<>();
while (arg0.hasNext()) {
String s = arg0.next();
String[] sArr = s.split(",");
List<RankingTrainData> clicks = session.getRankingTrainData(sArr[1], sArr[2], sArr[0]);
clickstreams.addAll(clicks);
}
tmpES.destroyBulkProcessor();
tmpES.close();
return clickstreams.iterator();
}
});
LOG.info("Clickstream number: {}", clickStreamRDD.count());
return clickStreamRDD;
}