in datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java [391:448]
public DataBag exec(Tuple input) throws IOException
{
DataBag bag = (DataBag) input.get(0);
boolean first = true;
double p = 0.0d; // the sampling probability
long n = 0L; // the size of the population (total number of items)
DataBag selected = _BAG_FACTORY.newDefaultBag();
DataBag waiting = _BAG_FACTORY.newSortedBag(ScoredTupleComparator.getInstance());
for (Tuple tuple : bag)
{
if (first)
{
p = (Double) tuple.get(0);
first = false;
}
n += (Long) tuple.get(1);
selected.addAll((DataBag) tuple.get(3));
waiting.addAll((DataBag) tuple.get(4));
}
long numSelected = selected.size();
long numWaiting = waiting.size();
long s = (long) Math.ceil(p * n); // sample size
System.out.println("To sample " + s + " items from " + n + ", we pre-selected "
+ numSelected + ", and waitlisted " + waiting.size() + ".");
long numNeeded = s - selected.size();
if (numNeeded < 0)
{
System.err.println("Pre-selected " + numSelected + " items, but only needed " + s
+ ".");
}
for (Tuple scored : waiting)
{
if (numNeeded <= 0)
{
break;
}
selected.add(ScoredTuple.fromIntermediateTuple(scored).getTuple());
numNeeded--;
}
if (numNeeded > 0)
{
System.err.println("The waiting list only has " + numWaiting
+ " items, but needed " + numNeeded + " more.");
}
return selected;
}