in datafu-pig/src/main/java/datafu/pig/sets/SetDifference.java [124:190]
public DataBag exec(Tuple input) throws IOException
{
if (input.size() < 2)
{
throw new RuntimeException("Expected at least two inputs, but found " + input.size());
}
for (Object o : input)
{
if (o != null && !(o instanceof DataBag))
{
throw new RuntimeException("Inputs must be bags");
}
}
DataBag outputBag = bagFactory.newDefaultBag();
DataBag bag1 = (DataBag)input.get(0);
DataBag bag2 = (DataBag)input.get(1);
if (bag1 == null || bag1.size() == 0)
{
return outputBag;
}
// optimization
else if (input.size() == 2 && (bag2 == null || bag2.size() == 0))
{
return bag1;
}
PriorityQueue<Pair> pq = loadBags(input);
Tuple lastData = null;
while (true)
{
Pair nextPair = pq.peek();
// ignore data we've already encountered
if (nextPair.data.compareTo(lastData) != 0)
{
// Only take data from the first bag, where there are no other
// bags that have the same data.
if (nextPair.index.equals(0) && countMatches(pq) == 0)
{
outputBag.add(nextPair.data);
lastData = nextPair.data;
}
}
Pair p = pq.poll();
// only put the bag back into the queue if it still has data
if (p.hasNext())
{
p.next();
pq.offer(p);
}
else if (p.index.equals(0))
{
// stop when we exhaust all elements from the first bag
break;
}
}
return outputBag;
}