in core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java [367:409]
public static JavaPairRDD<String, Vector> loadVectorFromCSV(SparkDriver spark, String csvFileName, int skipNum) {
// skip the first line (header), important!
JavaRDD<String> importRDD = spark.sc.textFile(csvFileName);
JavaPairRDD<String, Long> importIdRDD = importRDD.zipWithIndex().filter(new Function<Tuple2<String, Long>, Boolean>() {
/** */
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<String, Long> v1) throws Exception {
if (v1._2 > (skipNum - 1)) {
return true;
}
return false;
}
});
if (importIdRDD.count() == 0) {
return null;
}
return importIdRDD.mapToPair(new PairFunction<Tuple2<String, Long>, String, Vector>() {
/** */
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Vector> call(Tuple2<String, Long> t) throws Exception {
String[] fields = t._1.split(",");
String word = fields[0];
int fieldsize = fields.length;
int nStart = 1;
int nEnd = fieldsize - 1;
if (fieldsize < 2) {
nStart = 0;
nEnd = 0;
}
String[] numfields = Arrays.copyOfRange(fields, nStart, nEnd);
double[] nums = Stream.of(numfields).mapToDouble(Double::parseDouble).toArray();
Vector vec = Vectors.dense(nums);
return new Tuple2<>(word, vec);
}
});
}