public DataBag exec()

in datafu-pig/src/main/java/datafu/pig/sets/SetDifference.java [124:190]


  public DataBag exec(Tuple input) throws IOException
  {
    if (input.size() < 2)
    {
      throw new RuntimeException("Expected at least two inputs, but found " + input.size());
    }
    
    for (Object o : input)
    {
      if (o != null && !(o instanceof DataBag))
      {
        throw new RuntimeException("Inputs must be bags");
      }
    }

    DataBag outputBag = bagFactory.newDefaultBag();
    
    DataBag bag1 = (DataBag)input.get(0);
    DataBag bag2 = (DataBag)input.get(1);
    
    if (bag1 == null || bag1.size() == 0)
    {
      return outputBag;
    }
    // optimization
    else if (input.size() == 2 && (bag2 == null || bag2.size() == 0))
    {
      return bag1;
    }
    
    PriorityQueue<Pair> pq = loadBags(input);
    
    Tuple lastData = null;

    while (true) 
    {
      Pair nextPair = pq.peek();
      
      // ignore data we've already encountered
      if (nextPair.data.compareTo(lastData) != 0)
      {
        // Only take data from the first bag, where there are no other
        // bags that have the same data.
        if (nextPair.index.equals(0) && countMatches(pq) == 0)
        {
          outputBag.add(nextPair.data);
          lastData = nextPair.data;
        }
      }

      Pair p = pq.poll();      
      
      // only put the bag back into the queue if it still has data
      if (p.hasNext())
      {
        p.next();
        pq.offer(p);
      }
      else if (p.index.equals(0))
      {
        // stop when we exhaust all elements from the first bag
        break;
      }
    }

    return outputBag;
  }