in src/main/java/org/apache/datasketches/pig/quantiles/DataToItemsSketch.java [313:356]
public Tuple exec(final Tuple inputTuple) throws IOException { //throws is in API
if ((inputTuple != null) && (inputTuple.size() > 0)) {
final ItemsUnion<T> union = this.k_ > 0
? ItemsUnion.getInstance(this.k_, this.comparator_)
: ItemsUnion.getInstance(this.comparator_);
final DataBag outerBag = (DataBag) inputTuple.get(0);
for (final Tuple dataTuple: outerBag) {
final Object f0 = dataTuple.get(0);
if (f0 == null) { continue; }
if (f0 instanceof DataBag) {
final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
if (innerBag.size() == 0) { continue; }
// If field 0 of a dataTuple is a Bag all innerTuples of this inner bag
// will be passed into the union.
// It is due to system bagged outputs from multiple mapper Initial functions.
// The Intermediate stage was bypassed.
for (final Tuple innerTuple: innerBag) {
final Object value = innerTuple.get(0);
if (value != null) {
union.update(extractValue(value));
}
}
} else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
// If field 0 of a dataTuple is a DataByteArray we assume it is a sketch
// due to system bagged outputs from multiple mapper Intermediate functions.
// Each dataTuple.DBA:sketch will merged into the union.
final DataByteArray dba = (DataByteArray) f0;
union.update(ItemsSketch.getInstance(Memory.wrap(dba.get()), this.comparator_, this.serDe_));
} else {
throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
+ f0.getClass().getName());
}
}
final ItemsSketch<T> resultSketch = union.getResultAndReset();
if (resultSketch != null) {
return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray(this.serDe_)));
}
}
// return empty sketch
final ItemsSketch<T> sketch = this.k_ > 0
? ItemsSketch.getInstance(this.k_, this.comparator_)
: ItemsSketch.getInstance(this.comparator_);
return tupleFactory_.newTuple(new DataByteArray(sketch.toByteArray(this.serDe_)));
}