in core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java [223:352]
public static LabeledRowMatrix createDocWordMatrix(JavaPairRDD<String, List<String>> uniqueDocRDD, JavaSparkContext sc) {
// Index word with unique IDs
JavaPairRDD<String, Long> wordIDRDD = uniqueDocRDD.values().flatMap(new FlatMapFunction<List<String>, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(List<String> arg0) throws Exception {
return arg0.iterator();
}
}).distinct().zipWithIndex();
//
JavaPairRDD<Tuple2<String, String>, Double> docwordNumRDD = uniqueDocRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, List<String>>, Tuple2<String, String>, Double>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Iterator<Tuple2<Tuple2<String, String>, Double>> call(Tuple2<String, List<String>> docwords) throws Exception {
List<Tuple2<Tuple2<String, String>, Double>> pairs = new ArrayList<>();
List<String> words = docwords._2;
for (String word : words) {
Tuple2<String, String> worddoc = new Tuple2<>(docwords._1, word);
pairs.add(new Tuple2<Tuple2<String, String>, Double>(worddoc, 1.0));
}
return pairs.iterator();
}
}).reduceByKey(new Function2<Double, Double, Double>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Double call(Double first, Double second) throws Exception {
return first + second;
}
});
//
JavaPairRDD<String, Tuple2<String, Double>> wordDocnumRDD = docwordNumRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Double>, String, Tuple2<String, Double>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Tuple2<String, Double>> call(Tuple2<Tuple2<String, String>, Double> arg0) throws Exception {
Tuple2<String, Double> wordmums = new Tuple2<>(arg0._1._1, arg0._2);
return new Tuple2<>(arg0._1._2, wordmums);
}
});
//
JavaPairRDD<String, Tuple2<Tuple2<String, Double>, Optional<Long>>> testRDD = wordDocnumRDD.leftOuterJoin(wordIDRDD);
int wordsize = (int) wordIDRDD.count();
JavaPairRDD<String, Vector> docVectorRDD = testRDD.mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<String, Double>, Optional<Long>>>, String, Tuple2<List<Long>, List<Double>>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Tuple2<List<Long>, List<Double>>> call(Tuple2<String, Tuple2<Tuple2<String, Double>, Optional<Long>>> arg0) throws Exception {
Optional<Long> oid = arg0._2._2;
Long wordId = (long) 0;
if (oid.isPresent()) {
wordId = oid.get();
}
List<Long> word = new ArrayList<>();
word.add(wordId);
List<Double> count = new ArrayList<>();
count.add(arg0._2._1._2);
Tuple2<List<Long>, List<Double>> wordcount = new Tuple2<>(word, count);
return new Tuple2<>(arg0._2._1._1, wordcount);
}
}).reduceByKey(new Function2<Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<List<Long>, List<Double>> call(Tuple2<List<Long>, List<Double>> arg0, Tuple2<List<Long>, List<Double>> arg1) throws Exception {
arg0._1.addAll(arg1._1);
arg0._2.addAll(arg1._2);
return new Tuple2<>(arg0._1, arg0._2);
}
}).mapToPair(new PairFunction<Tuple2<String, Tuple2<List<Long>, List<Double>>>, String, Vector>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Vector> call(Tuple2<String, Tuple2<List<Long>, List<Double>>> arg0) throws Exception {
int docsize = arg0._2._1.size();
int[] intArray = new int[docsize];
double[] doubleArray = new double[docsize];
for (int i = 0; i < docsize; i++) {
intArray[i] = arg0._2._1.get(i).intValue();
doubleArray[i] = arg0._2._2.get(i).intValue();
}
Vector sv = Vectors.sparse(wordsize, intArray, doubleArray);
return new Tuple2<>(arg0._1, sv);
}
});
RowMatrix docwordMatrix = new RowMatrix(docVectorRDD.values().rdd());
LabeledRowMatrix labeledRowMatrix = new LabeledRowMatrix();
labeledRowMatrix.rowMatrix = docwordMatrix;
labeledRowMatrix.rowkeys = docVectorRDD.keys().collect();
labeledRowMatrix.colkeys = wordIDRDD.keys().collect();
return labeledRowMatrix;
}