in datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java [316:374]
public Tuple exec(Tuple input) throws IOException
{
DataBag bag = (DataBag) input.get(0);
DataBag selected = _BAG_FACTORY.newDefaultBag();
DataBag aggWaiting = _BAG_FACTORY.newDefaultBag();
boolean first = true;
double p = 0.0d;
long numItems = 0L; // number of items processed, including rejected
long n1 = 0L;
for (Tuple tuple : bag)
{
if (first)
{
p = (Double) tuple.get(0);
first = false;
}
numItems += (Long) tuple.get(1);
n1 = Math.max((Long) tuple.get(2), numItems);
selected.addAll((DataBag) tuple.get(3));
aggWaiting.addAll((DataBag) tuple.get(4));
}
DataBag waiting = _BAG_FACTORY.newDefaultBag();
if (n1 > 0L)
{
double q1 = getQ1(n1, p);
double q2 = getQ2(n1, p);
for (Tuple t : aggWaiting)
{
ScoredTuple scored = ScoredTuple.fromIntermediateTuple(t);
if (scored.getScore() < q1)
{
selected.add(scored.getTuple());
}
else if (scored.getScore() < q2)
{
waiting.add(t);
}
}
}
Tuple output = _TUPLE_FACTORY.newTuple();
output.append(p);
output.append(numItems);
output.append(n1);
output.append(selected);
output.append(waiting);
return output;
}