in datafu-pig/src/main/java/datafu/pig/bags/BagSplit.java [72:129]
public DataBag exec(Tuple arg0) throws IOException
{
DataBag outputBag = bagFactory.newDefaultBag();
Integer maxSize = (Integer)arg0.get(0);
Object o = arg0.get(1);
if (!(o instanceof DataBag))
throw new RuntimeException("parameter must be a databag");
DataBag inputBag = (DataBag)o;
DataBag currentBag = null;
int count = 0;
int numBags = 0;
for (Tuple tuple : inputBag)
{
if (currentBag == null)
{
currentBag = bagFactory.newDefaultBag();
}
currentBag.add(tuple);
count++;
if (count >= maxSize)
{
Tuple newTuple = tupleFactory.newTuple();
newTuple.append(currentBag);
if (this.appendBagNum)
{
newTuple.append(numBags);
}
numBags++;
outputBag.add(newTuple);
count = 0;
currentBag = null;
}
}
if (currentBag != null)
{
Tuple newTuple = tupleFactory.newTuple();
newTuple.append(currentBag);
if (this.appendBagNum)
{
newTuple.append(numBags);
}
outputBag.add(newTuple);
}
return outputBag;
}