public void accumulate()

in datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java [242:293]


  public void accumulate(Tuple t) throws IOException
  {
    if (aborted)
    {
      return;
    }
    
    DataBag bag = (DataBag) t.get(0);
    if (bag == null || bag.size() == 0)
      return;
    
    for (Tuple sourceTuple : bag) 
    {
      Integer sourceId = (Integer)sourceTuple.get(0);
      DataBag edges = (DataBag)sourceTuple.get(1);
      Double nodeBias = null;
      if (enableNodeBiasing)
      {
        nodeBias = (Double)sourceTuple.get(2);
      }

      ArrayList<Map<String,Object>> edgesMapList = new ArrayList<Map<String, Object>>();

      for (Tuple edgeTuple : edges)
      {
        Integer destId = (Integer)edgeTuple.get(0);
        Double weight = (Double)edgeTuple.get(1);
        HashMap<String,Object> edgeMap = new HashMap<String, Object>();
        edgeMap.put("dest",destId);
        edgeMap.put("weight",weight);
        edgesMapList.add(edgeMap);
      }

      if (enableNodeBiasing)
      {
        graph.addNode(sourceId, edgesMapList, nodeBias.floatValue());
      }
      else
      {
        graph.addNode(sourceId, edgesMapList);
      }

      if (graph.nodeCount() + graph.edgeCount() > maxNodesAndEdges)
      {
        System.out.println(String.format("There are too many nodes and edges (%d + %d > %d). Aborting.", graph.nodeCount(), graph.edgeCount(), maxNodesAndEdges));
        aborted = true;
        break;
      }

      reporter.progress();
    }
  }