public JavaRDD extractRankingTrainDataInParallel()

in core/src/main/java/org/apache/sdap/mudrod/weblog/structure/session/SessionExtractor.java [439:482]


  public JavaRDD<RankingTrainData> extractRankingTrainDataInParallel(Properties props, SparkDriver spark, ESDriver es) {

    List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));

    LOG.info(logIndexList.toString());

    List<String> sessionIdList = new ArrayList<>();
    for (String logIndex : logIndexList) {
      List<String> tmpsessionList = this.getSessions(props, es, logIndex);
      sessionIdList.addAll(tmpsessionList);
    }

    JavaRDD<String> sessionRDD = spark.sc.parallelize(sessionIdList, 16);

    JavaRDD<RankingTrainData> clickStreamRDD = sessionRDD.mapPartitions(
            new FlatMapFunction<Iterator<String>, RankingTrainData>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Iterator<RankingTrainData> call(Iterator<String> arg0) throws Exception {
        ESDriver tmpES = new ESDriver(props);
        tmpES.createBulkProcessor();

        Session session = new Session(props, tmpES);
        List<RankingTrainData> clickstreams = new ArrayList<>();
        while (arg0.hasNext()) {
          String s = arg0.next();
          String[] sArr = s.split(",");
          List<RankingTrainData> clicks = session.getRankingTrainData(sArr[1], sArr[2], sArr[0]);
          clickstreams.addAll(clicks);
        }
        tmpES.destroyBulkProcessor();
        tmpES.close();
        return clickstreams.iterator();
      }
    });

    LOG.info("Clickstream number: {}", clickStreamRDD.count());

    return clickStreamRDD;
  }