public Tuple exec()

in src/main/java/org/apache/datasketches/pig/quantiles/UnionItemsSketch.java [298:336]


    public Tuple exec(final Tuple inputTuple) throws IOException {
      if (inputTuple != null && inputTuple.size() > 0) {
        final ItemsUnion<T> union = this.k_ > 0
            ? ItemsUnion.getInstance(this.k_, this.comparator_)
            : ItemsUnion.getInstance(this.comparator_);
        final DataBag outerBag = (DataBag) inputTuple.get(0);
        for (final Tuple dataTuple: outerBag) {
          final Object f0 = dataTuple.get(0);
          if (f0 == null) { continue; }
          if (f0 instanceof DataBag) {
            final DataBag innerBag = (DataBag) f0; //inputTuple.bag0.dataTupleN.f0:bag
            if (innerBag.size() == 0) { continue; }
            // If field 0 of a dataTuple is again a Bag all tuples of this inner bag
            // will be passed into the union.
            // It is due to system bagged outputs from multiple mapper Initial functions.
            // The Intermediate stage was bypassed.
            updateUnion(innerBag, union, this.comparator_, this.serDe_);
          } else if (f0 instanceof DataByteArray) { //inputTuple.bag0.dataTupleN.f0:DBA
            // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch from a prior call
            // It is due to system bagged outputs from multiple mapper Intermediate functions.
            // Each dataTuple.DBA:sketch will merged into the union.
            final DataByteArray dba = (DataByteArray) f0;
            union.update(ItemsSketch.getInstance(Memory.wrap(dba.get()), this.comparator_, this.serDe_));
          } else {
            throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
              + f0.getClass().getName());
          }
        }
        final ItemsSketch<T> resultSketch = union.getResultAndReset();
        if (resultSketch != null) {
          return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray(this.serDe_)));
        }
      }
      // return empty sketch
      final ItemsSketch<T> sketch = this.k_ > 0
          ? ItemsSketch.getInstance(this.k_, this.comparator_)
          : ItemsSketch.getInstance(this.comparator_);
      return tupleFactory_.newTuple(new DataByteArray(sketch.toByteArray(this.serDe_)));
    }