public static JavaPairRDD createAccumuloIndex()

in webindex/modules/data/src/main/java/webindex/data/spark/IndexUtil.java [108:152]


  public static JavaPairRDD<RowColumn, Bytes> createAccumuloIndex(IndexStats stats,
      JavaRDD<Page> pages, JavaPairRDD<String, UriInfo> uriMap,
      JavaPairRDD<String, Long> domainMap) {

    JavaPairRDD<RowColumn, Bytes> accumuloIndex = pages.flatMapToPair(page -> {
      if (page.isEmpty()) {
        stats.addEmpty(1);
        return new ArrayList<>();
      }
      stats.addPage(1);
      Set<Link> links1 = page.getOutboundLinks();
      stats.addExternalLinks(links1.size());

      List<Tuple2<RowColumn, Bytes>> ret = new ArrayList<>();
      String uri = page.getUri();
      if (links1.size() > 0) {
        addRCV(ret, "p:" + uri, Constants.PAGE_CUR_COL, gson.toJson(page));
      }
      for (Link link : links1) {
        addRCV(ret, "p:" + link.getUri(), new Column(Constants.INLINKS, uri), link.getAnchorText());
      }
      return ret;
    });

    accumuloIndex = accumuloIndex.union(uriMap.flatMapToPair(t -> {
      List<Tuple2<RowColumn, Bytes>> ret = new ArrayList<>();
      String uri = t._1();
      UriInfo uriInfo = t._2();
      addRCV(ret, "t:" + IndexClient.revEncodeLong(uriInfo.linksTo) + ":" + uri, Column.EMPTY,
          uriInfo.linksTo);
      String domain = URL.fromUri(t._1()).getReverseDomain();
      String domainRow = IndexClient.encodeDomainRankUri(domain, uriInfo.linksTo, uri);
      addRCV(ret, domainRow, new Column(Constants.RANK, ""), uriInfo.linksTo);
      addRCV(ret, "p:" + uri, Constants.PAGE_INCOUNT_COL, uriInfo.linksTo);
      return ret;
    }));

    accumuloIndex = accumuloIndex.union(domainMap.mapToPair(t -> new Tuple2<>(
        new RowColumn("d:" + t._1(), new Column(Constants.DOMAIN, Constants.PAGECOUNT)),
        Bytes.of(t._2() + ""))));

    accumuloIndex.persist(StorageLevel.DISK_ONLY());

    return accumuloIndex;
  }