in core/src/main/java/org/apache/sdap/mudrod/weblog/structure/session/SessionExtractor.java [266:337]
public JavaPairRDD<String, Double> bulidSessionItermRDD(JavaRDD<ClickStream> clickstreamRDD) {
JavaPairRDD<String, String> sessionItemRDD = clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(ClickStream click) throws Exception {
String sessionID = click.getSessionID();
return new Tuple2<>(sessionID, click.getViewDataset());
}
}).distinct();
// remove some sessions
JavaPairRDD<String, Double> sessionItemNumRDD = sessionItemRDD.keys().mapToPair(new PairFunction<String, String, Double>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Double> call(String item) throws Exception {
return new Tuple2<>(item, 1.0);
}
}).reduceByKey(new Function2<Double, Double, Double>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Double call(Double v1, Double v2) throws Exception {
return v1 + v2;
}
}).filter(new Function<Tuple2<String, Double>, Boolean>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<String, Double> arg0) throws Exception {
Boolean b = true;
if (arg0._2 < 2) {
b = false;
}
return b;
}
});
return sessionItemNumRDD.leftOuterJoin(sessionItemRDD).mapToPair(new PairFunction<Tuple2<String, Tuple2<Double, Optional<String>>>, String, Double>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Double> call(Tuple2<String, Tuple2<Double, Optional<String>>> arg0) throws Exception {
Tuple2<Double, Optional<String>> test = arg0._2;
Optional<String> optStr = test._2;
String item = "";
if (optStr.isPresent()) {
item = optStr.get();
}
return new Tuple2<>(arg0._1 + "," + item, 1.0);
}
});
}