in webindex/modules/data/src/main/java/webindex/data/spark/IndexUtil.java [108:152]
public static JavaPairRDD<RowColumn, Bytes> createAccumuloIndex(IndexStats stats,
JavaRDD<Page> pages, JavaPairRDD<String, UriInfo> uriMap,
JavaPairRDD<String, Long> domainMap) {
JavaPairRDD<RowColumn, Bytes> accumuloIndex = pages.flatMapToPair(page -> {
if (page.isEmpty()) {
stats.addEmpty(1);
return new ArrayList<>();
}
stats.addPage(1);
Set<Link> links1 = page.getOutboundLinks();
stats.addExternalLinks(links1.size());
List<Tuple2<RowColumn, Bytes>> ret = new ArrayList<>();
String uri = page.getUri();
if (links1.size() > 0) {
addRCV(ret, "p:" + uri, Constants.PAGE_CUR_COL, gson.toJson(page));
}
for (Link link : links1) {
addRCV(ret, "p:" + link.getUri(), new Column(Constants.INLINKS, uri), link.getAnchorText());
}
return ret;
});
accumuloIndex = accumuloIndex.union(uriMap.flatMapToPair(t -> {
List<Tuple2<RowColumn, Bytes>> ret = new ArrayList<>();
String uri = t._1();
UriInfo uriInfo = t._2();
addRCV(ret, "t:" + IndexClient.revEncodeLong(uriInfo.linksTo) + ":" + uri, Column.EMPTY,
uriInfo.linksTo);
String domain = URL.fromUri(t._1()).getReverseDomain();
String domainRow = IndexClient.encodeDomainRankUri(domain, uriInfo.linksTo, uri);
addRCV(ret, domainRow, new Column(Constants.RANK, ""), uriInfo.linksTo);
addRCV(ret, "p:" + uri, Constants.PAGE_INCOUNT_COL, uriInfo.linksTo);
return ret;
}));
accumuloIndex = accumuloIndex.union(domainMap.mapToPair(t -> new Tuple2<>(
new RowColumn("d:" + t._1(), new Column(Constants.DOMAIN, Constants.PAGECOUNT)),
Bytes.of(t._2() + ""))));
accumuloIndex.persist(StorageLevel.DISK_ONLY());
return accumuloIndex;
}