in core/src/main/java/org/apache/sdap/mudrod/weblog/structure/session/SessionExtractor.java [76:119]
  private JavaRDD<ClickStream> getClickStreamListInParallel(Properties props, SparkDriver spark, ESDriver es) {
    List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
    LOG.info("Retrieved {}", logIndexList.toString());
    List<String> sessionIdList = new ArrayList<>();
    for (String logIndex : logIndexList) {
      List<String> tmpsessionList = this.getSessions(props, es, logIndex);
      sessionIdList.addAll(tmpsessionList);
    }
    JavaRDD<String> sessionRDD = spark.sc.parallelize(sessionIdList, 16);
    JavaRDD<ClickStream> clickStreamRDD = sessionRDD.mapPartitions(
            new FlatMapFunction<Iterator<String>, ClickStream>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;
      @Override
      public Iterator<ClickStream> call(Iterator<String> arg0) throws Exception {
        ESDriver tmpES = new ESDriver(props);
        tmpES.createBulkProcessor();
        Session session = new Session(props, tmpES);
        List<ClickStream> clickstreams = new ArrayList<>();
        while (arg0.hasNext()) {
          String s = arg0.next();
          String[] sArr = s.split(",");
          List<ClickStream> clicks = session.getClickStreamList(sArr[1], sArr[2], sArr[0]);
          clickstreams.addAll(clicks);
        }
        tmpES.destroyBulkProcessor();
        tmpES.close();
        return clickstreams.iterator();
      }
    });
    LOG.info("Clickstream number: {}", clickStreamRDD.count());
    return clickStreamRDD;
  }