public static JavaPairRDD createFluoTable()

in webindex/modules/data/src/main/java/webindex/data/spark/IndexUtil.java [157:196]


  public static JavaPairRDD<RowColumn, Bytes> createFluoTable(JavaRDD<Page> pages,
      JavaPairRDD<String, UriInfo> uriMap, JavaPairRDD<String, Long> domainMap, int numBuckets) {

    KryoSimplerSerializer serializer = new KryoSimplerSerializer(new WebindexKryoFactory());

    JavaPairRDD<RowColumn, Bytes> fluoIndex = pages.flatMapToPair(page -> {
      if (page.isEmpty()) {

        return new ArrayList<>();
      }
      Set<Link> links1 = page.getOutboundLinks();
      List<Tuple2<RowColumn, Bytes>> ret = new ArrayList<>();
      String uri = page.getUri();
      if (links1.size() > 0) {
        String hashedRow = PageObserver.getPageRowHasher().addHash(uri).toString();
        addRCV(ret, hashedRow, new Column(Constants.PAGE, Constants.CUR), gson.toJson(page));
      }
      return ret;
    });

    Initializer<String, UriInfo> uriCombineQueueInitializer =
        CombineQueue.getInitializer(UriCombineQ.URI_COMBINE_Q_ID, numBuckets, serializer);

    fluoIndex = fluoIndex.union(uriMap.mapToPair(t -> {
      RowColumnValue rcv = uriCombineQueueInitializer.convert(t._1(), t._2());
      return new Tuple2<>(new RowColumn(rcv.getRow(), rcv.getColumn()), rcv.getValue());
    }));

    Initializer<String, Long> domainMapInitializer =
        CombineQueue.getInitializer(DomainCombineQ.DOMAIN_COMBINE_Q_ID, numBuckets, serializer);

    fluoIndex = fluoIndex.union(domainMap.mapToPair(t -> {
      RowColumnValue rcv = domainMapInitializer.convert(t._1(), t._2());
      return new Tuple2<>(new RowColumn(rcv.getRow(), rcv.getColumn()), rcv.getValue());
    }));

    fluoIndex.persist(StorageLevel.DISK_ONLY());

    return fluoIndex;
  }