static DataByteArray process()

in src/main/java/org/apache/datasketches/pig/cpc/AlgebraicFinal.java [71:129]


  static DataByteArray process(final Tuple inputTuple, final int lgK, final long seed,
      final boolean isInputRaw) throws IOException {
    if ((inputTuple == null) || (inputTuple.size() == 0)) {
      return null;
    }
    CpcSketch sketch = null;
    CpcUnion union = null;
    final DataBag outerBag = (DataBag) inputTuple.get(0);
    if (outerBag == null) {
      return null;
    }
    for (final Tuple dataTuple: outerBag) {
      final Object f0 = dataTuple.get(0); // inputTuple.bag0.dataTupleN.f0
      if (f0 == null) {
        continue;
      }
      if (f0 instanceof DataBag) {
        final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag
        if (innerBag.size() == 0) { continue; }
        // If field 0 of a dataTuple is a Bag, all innerTuples of this inner bag
        // will be passed into the union.
        // It is due to system bagged outputs from multiple mapper Initial functions.
        // The Intermediate stage was bypassed.
        if (isInputRaw) {
          if (sketch == null) {
            sketch = new CpcSketch(lgK, seed);
          }
          DataToSketch.updateSketch(innerBag, sketch);
        } else {
          if (union == null) {
            union = new CpcUnion(lgK, seed);
          }
          UnionSketch.updateUnion(innerBag, union, seed);
        }
      } else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA
        // If field 0 of a dataTuple is a DataByteArray, we assume it is a sketch
        // due to system bagged outputs from multiple mapper Intermediate functions.
        // Each dataTuple.DBA:sketch will merged into the union.
        final DataByteArray dba = (DataByteArray) f0;
        if (union == null) {
          union = new CpcUnion(lgK, seed);
        }
        union.update(CpcSketch.heapify(dba.get(), seed));
      } else { // we should never get here
        throw new IllegalArgumentException("dataTuple.Field0 is not a DataBag or DataByteArray: "
            + f0.getClass().getName());
      }
    }
    if ((sketch != null) && (union != null)) {
      union.update(sketch);
      sketch = null;
    }
    if (sketch != null) {
      return new DataByteArray(sketch.toByteArray());
    } else if (union != null) {
      return new DataByteArray(union.getResult().toByteArray());
    }
    return null;
  }