in src/main/java/org/apache/datasketches/pig/sampling/ReservoirUnion.java [73:96]
public void accumulate(final Tuple inputTuple) throws IOException {
if (inputTuple == null || inputTuple.size() < 1 || inputTuple.isNull(0)) {
return;
}
final DataBag reservoirs = (DataBag) inputTuple.get(0);
if (this.union_ == null) {
this.union_ = ReservoirItemsUnion.newInstance(this.maxK_);
}
try {
for (Tuple t : reservoirs) {
// if t == null or t.size() < 3, we'll throw an exception
final long n = (long) t.get(0);
final int k = (int) t.get(1);
final DataBag sampleBag = (DataBag) t.get(2);
final ArrayList<Tuple> samples = ReservoirSampling.dataBagToArrayList(sampleBag);
this.union_.update(n, k, samples);
}
} catch (final IndexOutOfBoundsException e) {
throw new ExecException("Cannot update union with given reservoir", e);
}
}