in src/main/java/org/apache/datasketches/pig/theta/Intersect.java [350:395]
public Tuple exec(final Tuple inputTuple) throws IOException { //throws is in API
final Intersection intersection = SetOperation.builder().setSeed(this.mySeed_).buildIntersection();
final DataBag outerBag = extractBag(inputTuple); //InputTuple.bag0
if (outerBag == null) { //must have non-empty outer bag at field 0.
return this.myEmptyCompactOrderedSketchTuple_;
}
//Bag is not empty.
for (Tuple dataTuple : outerBag) {
final Object f0 = extractFieldAtIndex(dataTuple, 0); //inputTuple.bag0.dataTupleN.f0
//must have non-null field zero
if (f0 == null) {
continue; //go to next dataTuple if there is one. If none, exception is thrown.
}
//f0 is not null
if (f0 instanceof DataBag) {
final DataBag innerBag = (DataBag)f0; //inputTuple.bag0.dataTupleN.f0:bag
if (innerBag.size() == 0) {
continue; //go to next dataTuple if there is one. If none, exception is thrown.
}
//If field 0 of a dataTuple is again a Bag all tuples of this inner bag
// will be passed into the union.
//It is due to system bagged outputs from multiple mapper Initial functions.
//The Intermediate stage was bypassed.
updateIntersection(innerBag, intersection, this.mySeed_); //process all tuples of innerBag
}
else if (f0 instanceof DataByteArray) { //inputTuple.bag0.dataTupleN.f0:DBA
//If field 0 of a dataTuple is a DataByteArray we assume it is a sketch from a prior call
//It is due to system bagged outputs from multiple mapper Intermediate functions.
// Each dataTuple.DBA:sketch will merged into the union.
final DataByteArray dba = (DataByteArray) f0;
final Memory srcMem = Memory.wrap(dba.get());
final Sketch sketch = Sketch.wrap(srcMem, this.mySeed_);
intersection.intersect(sketch);
}
else { // we should never get here.
throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
+ f0.getClass().getName());
}
}
final CompactSketch compactSketch = intersection.getResult(true, null);
return compactOrderedSketchToTuple(compactSketch);
}