public JavaPairRDD bulidSessionItermRDD()

in core/src/main/java/org/apache/sdap/mudrod/weblog/structure/session/SessionExtractor.java [266:337]


  public JavaPairRDD<String, Double> bulidSessionItermRDD(JavaRDD<ClickStream> clickstreamRDD) {
    JavaPairRDD<String, String> sessionItemRDD = clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, String>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<String, String> call(ClickStream click) throws Exception {

        String sessionID = click.getSessionID();
        return new Tuple2<>(sessionID, click.getViewDataset());
      }
    }).distinct();

    // remove some sessions
    JavaPairRDD<String, Double> sessionItemNumRDD = sessionItemRDD.keys().mapToPair(new PairFunction<String, String, Double>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<String, Double> call(String item) throws Exception {
        return new Tuple2<>(item, 1.0);
      }
    }).reduceByKey(new Function2<Double, Double, Double>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Double call(Double v1, Double v2) throws Exception {
        return v1 + v2;
      }
    }).filter(new Function<Tuple2<String, Double>, Boolean>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Boolean call(Tuple2<String, Double> arg0) throws Exception {
        Boolean b = true;
        if (arg0._2 < 2) {
          b = false;
        }
        return b;
      }
    });

    return sessionItemNumRDD.leftOuterJoin(sessionItemRDD).mapToPair(new PairFunction<Tuple2<String, Tuple2<Double, Optional<String>>>, String, Double>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<String, Double> call(Tuple2<String, Tuple2<Double, Optional<String>>> arg0) throws Exception {

        Tuple2<Double, Optional<String>> test = arg0._2;
        Optional<String> optStr = test._2;
        String item = "";
        if (optStr.isPresent()) {
          item = optStr.get();
        }
        return new Tuple2<>(arg0._1 + "," + item, 1.0);
      }

    });
  }