in src/main/java/org/apache/datasketches/pig/sampling/ReservoirSampling.java [202:231]
public Tuple exec(final Tuple inputTuple) throws IOException {
if ((inputTuple == null) || (inputTuple.size() < 1) || inputTuple.isNull(0)) {
return null;
}
final DataBag records = (DataBag) inputTuple.get(0);
final ReservoirItemsSketch<Tuple> reservoir;
final DataBag outputBag;
int k = this.targetK_;
if (records.size() <= this.targetK_) {
outputBag = records;
} else {
reservoir = ReservoirItemsSketch.newInstance(this.targetK_);
for (Tuple t : records) {
reservoir.update(t);
}
// newDefaultBag(List<Tuple>) does *not* copy values
final List<Tuple> data = SamplingPigUtil.getRawSamplesAsList(reservoir);
outputBag = BagFactory.getInstance().newDefaultBag(data);
k = reservoir.getK();
}
final Tuple output = TupleFactory.getInstance().newTuple(3);
output.set(0, records.size());
output.set(1, k);
output.set(2, outputBag);
return output;
}