in webindex/modules/data/src/main/java/webindex/data/spark/IndexUtil.java [157:196]
public static JavaPairRDD<RowColumn, Bytes> createFluoTable(JavaRDD<Page> pages,
JavaPairRDD<String, UriInfo> uriMap, JavaPairRDD<String, Long> domainMap, int numBuckets) {
KryoSimplerSerializer serializer = new KryoSimplerSerializer(new WebindexKryoFactory());
JavaPairRDD<RowColumn, Bytes> fluoIndex = pages.flatMapToPair(page -> {
if (page.isEmpty()) {
return new ArrayList<>();
}
Set<Link> links1 = page.getOutboundLinks();
List<Tuple2<RowColumn, Bytes>> ret = new ArrayList<>();
String uri = page.getUri();
if (links1.size() > 0) {
String hashedRow = PageObserver.getPageRowHasher().addHash(uri).toString();
addRCV(ret, hashedRow, new Column(Constants.PAGE, Constants.CUR), gson.toJson(page));
}
return ret;
});
Initializer<String, UriInfo> uriCombineQueueInitializer =
CombineQueue.getInitializer(UriCombineQ.URI_COMBINE_Q_ID, numBuckets, serializer);
fluoIndex = fluoIndex.union(uriMap.mapToPair(t -> {
RowColumnValue rcv = uriCombineQueueInitializer.convert(t._1(), t._2());
return new Tuple2<>(new RowColumn(rcv.getRow(), rcv.getColumn()), rcv.getValue());
}));
Initializer<String, Long> domainMapInitializer =
CombineQueue.getInitializer(DomainCombineQ.DOMAIN_COMBINE_Q_ID, numBuckets, serializer);
fluoIndex = fluoIndex.union(domainMap.mapToPair(t -> {
RowColumnValue rcv = domainMapInitializer.convert(t._1(), t._2());
return new Tuple2<>(new RowColumn(rcv.getRow(), rcv.getColumn()), rcv.getValue());
}));
fluoIndex.persist(StorageLevel.DISK_ONLY());
return fluoIndex;
}