public DataBag exec()

in datafu-pig/src/main/java/datafu/pig/bags/BagSplit.java [72:129]


  public DataBag exec(Tuple arg0) throws IOException
  { 
    DataBag outputBag = bagFactory.newDefaultBag();
    
    Integer maxSize = (Integer)arg0.get(0);
    
    Object o = arg0.get(1);
    if (!(o instanceof DataBag))
      throw new RuntimeException("parameter must be a databag");
    
    DataBag inputBag = (DataBag)o;
    
    DataBag currentBag = null;
    
    int count = 0;
    int numBags = 0;
    for (Tuple tuple : inputBag)
    {
      if (currentBag == null)
      {
        currentBag = bagFactory.newDefaultBag();
      }
      
      currentBag.add(tuple);
      count++;
      
      if (count >= maxSize)
      {
        Tuple newTuple = tupleFactory.newTuple();
        newTuple.append(currentBag);
        
        if (this.appendBagNum)
        {
          newTuple.append(numBags);
        }
        
        numBags++;
        
        outputBag.add(newTuple);
        
        count = 0;
        currentBag = null;
      }
    }
    
    if (currentBag != null)
    {
      Tuple newTuple = tupleFactory.newTuple();
      newTuple.append(currentBag);
      if (this.appendBagNum)
      {
        newTuple.append(numBags);
      }
      outputBag.add(newTuple);
    }
        
    return outputBag;
  }