private static DataByteArray process()

in src/main/java/org/apache/datasketches/pig/kll/DataToSketch.java [329:359]


  private static DataByteArray process(final Tuple inputTuple, final int k) throws IOException {
    final KllFloatsSketch sketch = new KllFloatsSketch(k);
    if ((inputTuple != null) && (inputTuple.size() > 0)) {
      final DataBag outerBag = (DataBag) inputTuple.get(0);
      for (final Tuple dataTuple: outerBag) {
        final Object f0 = dataTuple.get(0);
        if (f0 == null) { continue; }
        if (f0 instanceof DataBag) {
          final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
          if (innerBag.size() == 0) { continue; }
          // If field 0 of a dataTuple is a Bag all innerTuples of this inner bag
          // will be passed into the union.
          // It is due to system bagged outputs from multiple mapper Initial functions.
          // The Intermediate stage was bypassed.
          for (final Tuple innerTuple: innerBag) {
            sketch.update((Float) innerTuple.get(0));
          }
        } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
          // If field 0 of a dataTuple is a DataByteArray we assume it is a sketch
          // due to system bagged outputs from multiple mapper Intermediate functions.
          // Each dataTuple.DBA:sketch will merged into the union.
          final DataByteArray dba = (DataByteArray) f0;
          sketch.merge(KllFloatsSketch.heapify(Memory.wrap(dba.get())));
        } else {
          throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
              + f0.getClass().getName());
        }
      }
    }
    return new DataByteArray(sketch.toByteArray());
  }