in core/src/main/java/org/apache/sdap/mudrod/utils/SimilarityUtil.java [141:209]
public static List<LinkageTriple> matrixToTriples(JavaRDD<String> keys, CoordinateMatrix simMatirx) {
if (simMatirx.numCols() != keys.count()) {
return null;
}
// index words
JavaPairRDD<Long, String> keyIdRDD = JavaPairRDD.fromJavaRDD(keys.zipWithIndex().map(new Function<Tuple2<String, Long>, Tuple2<Long, String>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, String> call(Tuple2<String, Long> docId) {
return docId.swap();
}
}));
JavaPairRDD<Long, LinkageTriple> entriesRowRDD = simMatirx.entries().toJavaRDD().mapToPair(new PairFunction<MatrixEntry, Long, LinkageTriple>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, LinkageTriple> call(MatrixEntry t) throws Exception {
LinkageTriple triple = new LinkageTriple();
triple.keyAId = t.i();
triple.keyBId = t.j();
triple.weight = t.value();
return new Tuple2<>(triple.keyAId, triple);
}
});
JavaPairRDD<Long, LinkageTriple> entriesColRDD = entriesRowRDD.leftOuterJoin(keyIdRDD).values().mapToPair(new PairFunction<Tuple2<LinkageTriple, Optional<String>>, Long, LinkageTriple>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, LinkageTriple> call(Tuple2<LinkageTriple, Optional<String>> t) throws Exception {
LinkageTriple triple = t._1;
Optional<String> stra = t._2;
if (stra.isPresent()) {
triple.keyA = stra.get();
}
return new Tuple2<>(triple.keyBId, triple);
}
});
JavaRDD<LinkageTriple> tripleRDD = entriesColRDD.leftOuterJoin(keyIdRDD).values().map(new Function<Tuple2<LinkageTriple, Optional<String>>, LinkageTriple>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public LinkageTriple call(Tuple2<LinkageTriple, Optional<String>> t) throws Exception {
LinkageTriple triple = t._1;
Optional<String> strb = t._2;
if (strb.isPresent()) {
triple.keyB = strb.get();
}
return triple;
}
});
return tripleRDD.collect();
}