public static LabeledRowMatrix createDocWordMatrix()

in core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java [223:352]


  public static LabeledRowMatrix createDocWordMatrix(JavaPairRDD<String, List<String>> uniqueDocRDD, JavaSparkContext sc) {
    // Index word with unique IDs
    JavaPairRDD<String, Long> wordIDRDD = uniqueDocRDD.values().flatMap(new FlatMapFunction<List<String>, String>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Iterator<String> call(List<String> arg0) throws Exception {
        return arg0.iterator();
      }
    }).distinct().zipWithIndex();

    //
    JavaPairRDD<Tuple2<String, String>, Double> docwordNumRDD = uniqueDocRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, List<String>>, Tuple2<String, String>, Double>() {

      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Iterator<Tuple2<Tuple2<String, String>, Double>> call(Tuple2<String, List<String>> docwords) throws Exception {
        List<Tuple2<Tuple2<String, String>, Double>> pairs = new ArrayList<>();
        List<String> words = docwords._2;
        for (String word : words) {
          Tuple2<String, String> worddoc = new Tuple2<>(docwords._1, word);
          pairs.add(new Tuple2<Tuple2<String, String>, Double>(worddoc, 1.0));
        }
        return pairs.iterator();
      }
    }).reduceByKey(new Function2<Double, Double, Double>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Double call(Double first, Double second) throws Exception {
        return first + second;
      }
    });

    //
    JavaPairRDD<String, Tuple2<String, Double>> wordDocnumRDD = docwordNumRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Double>, String, Tuple2<String, Double>>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<String, Tuple2<String, Double>> call(Tuple2<Tuple2<String, String>, Double> arg0) throws Exception {

        Tuple2<String, Double> wordmums = new Tuple2<>(arg0._1._1, arg0._2);
        return new Tuple2<>(arg0._1._2, wordmums);
      }
    });

    //

    JavaPairRDD<String, Tuple2<Tuple2<String, Double>, Optional<Long>>> testRDD = wordDocnumRDD.leftOuterJoin(wordIDRDD);

    int wordsize = (int) wordIDRDD.count();
    JavaPairRDD<String, Vector> docVectorRDD = testRDD.mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<String, Double>, Optional<Long>>>, String, Tuple2<List<Long>, List<Double>>>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<String, Tuple2<List<Long>, List<Double>>> call(Tuple2<String, Tuple2<Tuple2<String, Double>, Optional<Long>>> arg0) throws Exception {
        Optional<Long> oid = arg0._2._2;
        Long wordId = (long) 0;
        if (oid.isPresent()) {
          wordId = oid.get();
        }

        List<Long> word = new ArrayList<>();
        word.add(wordId);

        List<Double> count = new ArrayList<>();
        count.add(arg0._2._1._2);

        Tuple2<List<Long>, List<Double>> wordcount = new Tuple2<>(word, count);

        return new Tuple2<>(arg0._2._1._1, wordcount);
      }

    }).reduceByKey(new Function2<Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<List<Long>, List<Double>> call(Tuple2<List<Long>, List<Double>> arg0, Tuple2<List<Long>, List<Double>> arg1) throws Exception {
        arg0._1.addAll(arg1._1);
        arg0._2.addAll(arg1._2);
        return new Tuple2<>(arg0._1, arg0._2);
      }
    }).mapToPair(new PairFunction<Tuple2<String, Tuple2<List<Long>, List<Double>>>, String, Vector>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<String, Vector> call(Tuple2<String, Tuple2<List<Long>, List<Double>>> arg0) throws Exception {
        int docsize = arg0._2._1.size();
        int[] intArray = new int[docsize];
        double[] doubleArray = new double[docsize];
        for (int i = 0; i < docsize; i++) {
          intArray[i] = arg0._2._1.get(i).intValue();
          doubleArray[i] = arg0._2._2.get(i).intValue();
        }
        Vector sv = Vectors.sparse(wordsize, intArray, doubleArray);
        return new Tuple2<>(arg0._1, sv);
      }
    });

    RowMatrix docwordMatrix = new RowMatrix(docVectorRDD.values().rdd());

    LabeledRowMatrix labeledRowMatrix = new LabeledRowMatrix();
    labeledRowMatrix.rowMatrix = docwordMatrix;
    labeledRowMatrix.rowkeys = docVectorRDD.keys().collect();
    labeledRowMatrix.colkeys = wordIDRDD.keys().collect();

    return labeledRowMatrix;
  }