public static List matrixToTriples()

in core/src/main/java/org/apache/sdap/mudrod/utils/SimilarityUtil.java [141:209]


  public static List<LinkageTriple> matrixToTriples(JavaRDD<String> keys, CoordinateMatrix simMatirx) {
    if (simMatirx.numCols() != keys.count()) {
      return null;
    }

    // index words
    JavaPairRDD<Long, String> keyIdRDD = JavaPairRDD.fromJavaRDD(keys.zipWithIndex().map(new Function<Tuple2<String, Long>, Tuple2<Long, String>>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<Long, String> call(Tuple2<String, Long> docId) {
        return docId.swap();
      }
    }));

    JavaPairRDD<Long, LinkageTriple> entriesRowRDD = simMatirx.entries().toJavaRDD().mapToPair(new PairFunction<MatrixEntry, Long, LinkageTriple>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<Long, LinkageTriple> call(MatrixEntry t) throws Exception {
        LinkageTriple triple = new LinkageTriple();
        triple.keyAId = t.i();
        triple.keyBId = t.j();
        triple.weight = t.value();
        return new Tuple2<>(triple.keyAId, triple);
      }
    });

    JavaPairRDD<Long, LinkageTriple> entriesColRDD = entriesRowRDD.leftOuterJoin(keyIdRDD).values().mapToPair(new PairFunction<Tuple2<LinkageTriple, Optional<String>>, Long, LinkageTriple>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public Tuple2<Long, LinkageTriple> call(Tuple2<LinkageTriple, Optional<String>> t) throws Exception {
        LinkageTriple triple = t._1;
        Optional<String> stra = t._2;
        if (stra.isPresent()) {
          triple.keyA = stra.get();
        }
        return new Tuple2<>(triple.keyBId, triple);
      }
    });

    JavaRDD<LinkageTriple> tripleRDD = entriesColRDD.leftOuterJoin(keyIdRDD).values().map(new Function<Tuple2<LinkageTriple, Optional<String>>, LinkageTriple>() {
      /**
       *
       */
      private static final long serialVersionUID = 1L;

      @Override
      public LinkageTriple call(Tuple2<LinkageTriple, Optional<String>> t) throws Exception {
        LinkageTriple triple = t._1;
        Optional<String> strb = t._2;
        if (strb.isPresent()) {
          triple.keyB = strb.get();
        }
        return triple;
      }
    });
    return tripleRDD.collect();
  }