in datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java [196:299]
public Tuple exec(Tuple input) throws IOException
{
int numArgs = input.size();
// The first if clause is for backward compatibility, which should be removed
// after we remove specifying sampling probability in the constructor.
if(numArgs == 1)
{
if(_p < 0.0d)
{
throw new IllegalArgumentException("Sampling probability is not given.");
}
}
else if (numArgs < 2 || numArgs > 3)
{
throw new IllegalArgumentException("The input tuple should have either two or three fields: "
+ "a bag of items, the sampling probability, "
+ "and optionally a good lower bound of the size of the population or the exact number.");
}
DataBag items = (DataBag) input.get(0);
long numItems = items.size();
_localCount += numItems;
// This is also for backward compatibility. Should change to
// double p = ((Number) input.get(1)).doubleValue();
// after we remove specifying sampling probability in the constructor.
double p = numArgs == 1 ? _p : ((Number) input.get(1)).doubleValue();
if (_first)
{
_p = p;
verifySamplingProbability(p);
}
else
{
if (p != _p)
{
throw new IllegalArgumentException("The sampling probability must be a scalar, but found two different values: "
+ _p + " and " + p + ".");
}
}
long n1 = 0L;
if (numArgs > 2)
{
n1 = ((Number) input.get(2)).longValue();
if (_first)
{
_n1 = n1;
}
else
{
if (n1 != _n1)
{
throw new IllegalArgumentException("The lower bound of the population size must be a scalar, but found two different values: "
+ _n1 + " and " + n1 + ".");
}
}
}
_first = false;
// Use the local count if the input lower bound is smaller.
n1 = Math.max(n1, _localCount);
DataBag selected = _BAG_FACTORY.newDefaultBag();
DataBag waiting = _BAG_FACTORY.newDefaultBag();
if (n1 > 0L)
{
double q1 = getQ1(n1, p);
double q2 = getQ2(n1, p);
for (Tuple t : items)
{
double x = nextDouble();
if (x < q1)
{
selected.add(t);
}
else if (x < q2)
{
waiting.add(new ScoredTuple(x, t).getIntermediateTuple(_TUPLE_FACTORY));
}
}
}
/*
* The output tuple contains the following fields: sampling probability (double),
* number of processed items in this tuple (long), a good lower bound of the size of
* the population or the exact number (long), a bag of selected items (bag), and a
* bag of waitlisted items with scores (bag).
*/
Tuple output = _TUPLE_FACTORY.newTuple();
output.append(p);
output.append(numItems);
output.append(n1);
output.append(selected);
output.append(waiting);
return output;
}