in src/main/java/org/apache/datasketches/pig/sampling/ReservoirSampling.java [255:287]
public Tuple exec(final Tuple inputTuple) throws IOException {
if ((inputTuple == null) || (inputTuple.size() < 1) || inputTuple.isNull(0)) {
return null;
}
final ReservoirItemsUnion<Tuple> union = ReservoirItemsUnion.newInstance(this.targetK_);
final DataBag outerBag = (DataBag) inputTuple.get(0);
for (Tuple reservoir : outerBag) {
final long n = (long) reservoir.get(0);
final int k = (int) reservoir.get(1);
if ((n <= k) && (k <= this.targetK_)) {
for (Tuple t : (DataBag) reservoir.get(2)) {
union.update(t);
}
} else {
final ArrayList<Tuple> samples = dataBagToArrayList((DataBag) reservoir.get(2));
union.update(n, k, samples);
}
}
final ReservoirItemsSketch<Tuple> result = union.getResult();
final ArrayList<Tuple> data = SamplingPigUtil.getRawSamplesAsList(result);
final DataBag sampleBag = BagFactory.getInstance().newDefaultBag(data);
final Tuple output = TupleFactory.getInstance().newTuple(3);
output.set(0, result.getN());
output.set(1, result.getK());
output.set(2, sampleBag);
return output;
}