in datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java [242:293]
public void accumulate(Tuple t) throws IOException
{
if (aborted)
{
return;
}
DataBag bag = (DataBag) t.get(0);
if (bag == null || bag.size() == 0)
return;
for (Tuple sourceTuple : bag)
{
Integer sourceId = (Integer)sourceTuple.get(0);
DataBag edges = (DataBag)sourceTuple.get(1);
Double nodeBias = null;
if (enableNodeBiasing)
{
nodeBias = (Double)sourceTuple.get(2);
}
ArrayList<Map<String,Object>> edgesMapList = new ArrayList<Map<String, Object>>();
for (Tuple edgeTuple : edges)
{
Integer destId = (Integer)edgeTuple.get(0);
Double weight = (Double)edgeTuple.get(1);
HashMap<String,Object> edgeMap = new HashMap<String, Object>();
edgeMap.put("dest",destId);
edgeMap.put("weight",weight);
edgesMapList.add(edgeMap);
}
if (enableNodeBiasing)
{
graph.addNode(sourceId, edgesMapList, nodeBias.floatValue());
}
else
{
graph.addNode(sourceId, edgesMapList);
}
if (graph.nodeCount() + graph.edgeCount() > maxNodesAndEdges)
{
System.out.println(String.format("There are too many nodes and edges (%d + %d > %d). Aborting.", graph.nodeCount(), graph.edgeCount(), maxNodesAndEdges));
aborted = true;
break;
}
reporter.progress();
}
}