public static VarOptItemsSketch heapify()

in src/main/java/org/apache/datasketches/sampling/VarOptItemsSketch.java [271:424]


  public static <T> VarOptItemsSketch<T> heapify(final Memory srcMem,
                                                 final ArrayOfItemsSerDe<T> serDe) {
    final int numPreLongs = getAndCheckPreLongs(srcMem);
    final ResizeFactor rf = ResizeFactor.getRF(extractResizeFactor(srcMem));
    final int serVer = extractSerVer(srcMem);
    final int familyId = extractFamilyID(srcMem);
    final int flags = extractFlags(srcMem);
    final boolean isEmpty = (flags & EMPTY_FLAG_MASK) != 0;
    final boolean isGadget = (flags & GADGET_FLAG_MASK) != 0;

    // Check values
    if (isEmpty) {
      if (numPreLongs != VO_PRELONGS_EMPTY) {
        throw new SketchesArgumentException("Possible corruption: Must be " + VO_PRELONGS_EMPTY
                + " for an empty sketch. Found: " + numPreLongs);
      }
    } else {
      if ((numPreLongs != VO_PRELONGS_WARMUP)
          && (numPreLongs != VO_PRELONGS_FULL)) {
        throw new SketchesArgumentException("Possible corruption: Must be " + VO_PRELONGS_WARMUP
                + " or " + VO_PRELONGS_FULL + " for a non-empty sketch. Found: " + numPreLongs);
      }
    }
    if (serVer != VAROPT_SER_VER) {
        throw new SketchesArgumentException(
                "Possible Corruption: Ser Ver must be " + VAROPT_SER_VER + ": " + serVer);
    }
    final int reqFamilyId = Family.VAROPT.getID();
    if (familyId != reqFamilyId) {
      throw new SketchesArgumentException(
              "Possible Corruption: FamilyID must be " + reqFamilyId + ": " + familyId);
    }

    final int k = extractK(srcMem);
    if (k < 1) {
      throw new SketchesArgumentException("Possible Corruption: k must be at least 1: " + k);
    }

    if (isEmpty) {
      assert numPreLongs == Family.VAROPT.getMinPreLongs();
      return new VarOptItemsSketch<>(k, rf);
    }

    final long n = extractN(srcMem);
    if (n < 0) {
      throw new SketchesArgumentException("Possible Corruption: n cannot be negative: " + n);
    }

    // get rest of preamble
    final int hCount = extractHRegionItemCount(srcMem);
    final int rCount = extractRRegionItemCount(srcMem);

    if (hCount < 0) {
      throw new SketchesArgumentException("Possible Corruption: H region count cannot be "
              + "negative: " + hCount);
    }
    if (rCount < 0) {
      throw new SketchesArgumentException("Possible Corruption: R region count cannot be "
              + "negative: " + rCount);
    }

    double totalRWeight = 0.0;
    if (numPreLongs == Family.VAROPT.getMaxPreLongs()) {
      if (rCount > 0) {
        totalRWeight = extractTotalRWeight(srcMem);
      } else {
        throw new SketchesArgumentException(
                "Possible Corruption: "
                        + Family.VAROPT.getMaxPreLongs() + " preLongs but no items in R region");
      }
    }

    final int preLongBytes = numPreLongs << 3;

    final int totalItems = hCount + rCount;
    int allocatedItems = k + 1; // default to full

    if (rCount == 0) {
      // Not in sampling mode, so determine size to allocate, using ceilingLog2(hCount) as minimum
      final int ceilingLgK = Util.exactLog2OfInt(Util.ceilingPowerOf2(k), "heapify");
      final int minLgSize = Util.exactLog2OfInt(Util.ceilingPowerOf2(hCount), "heapify");
      final int initialLgSize = SamplingUtil.startingSubMultiple(ceilingLgK, rf.lg(),
              Math.max(minLgSize, MIN_LG_ARR_ITEMS));

      allocatedItems = SamplingUtil.getAdjustedSize(k, 1 << initialLgSize);
      if (allocatedItems == k) {
        ++allocatedItems;
      }
    }

    // allocate full-sized ArrayLists, but we store only hCount weights at any moment
    final long weightOffsetBytes = TOTAL_WEIGHT_R_DOUBLE + (rCount > 0 ? Double.BYTES : 0);
    final ArrayList<Double> weightList = new ArrayList<>(allocatedItems);
    final double[] wts = new double[allocatedItems];
    srcMem.getDoubleArray(weightOffsetBytes, wts, 0, hCount);
    // can't use Arrays.asList(wts) since double[] rather than Double[]
    for (int i = 0; i < hCount; ++ i) {
      if (wts[i] <= 0.0) {
      throw new SketchesArgumentException("Possible Corruption: "
              + "Non-positive weight in heapify(): " + wts[i]);
      }
      weightList.add(wts[i]);
    }

    // marks, if we have a gadget
    long markBytes = 0;
    int markCount = 0;
    ArrayList<Boolean> markList = null;
    if (isGadget) {
      final long markOffsetBytes = preLongBytes + ((long) hCount * Double.BYTES);
      markBytes = ArrayOfBooleansSerDe.computeBytesNeeded(hCount);
      markList = new ArrayList<>(allocatedItems);

      final ArrayOfBooleansSerDe booleansSerDe = new ArrayOfBooleansSerDe();
      final Boolean[] markArray = booleansSerDe.deserializeFromMemory(
              srcMem.region(markOffsetBytes, (hCount >>> 3) + 1), 0, hCount);

      for (Boolean mark : markArray) {
        if (mark) { ++markCount; }
      }
      markList.addAll(Arrays.asList(markArray));
    }

    final long offsetBytes = preLongBytes + ((long) hCount * Double.BYTES) + markBytes;
    final T[] data = serDe.deserializeFromMemory(
            srcMem.region(offsetBytes, srcMem.getCapacity() - offsetBytes), 0, totalItems);
    final List<T> wrappedData = Arrays.asList(data);
    final ArrayList<T> dataList = new ArrayList<>(allocatedItems);
    dataList.addAll(wrappedData.subList(0, hCount));

    // Load items in R as needed
    if (rCount > 0) {
      weightList.add(-1.0); // the gap
      if (isGadget) { markList.add(false); } // the gap
      for (int i = 0; i < rCount; ++i) {
        weightList.add(-1.0);
        if (isGadget) { markList.add(false); }
      }

      dataList.add(null); // the gap
      dataList.addAll(wrappedData.subList(hCount, totalItems));
    }

    final VarOptItemsSketch<T> sketch =
            new VarOptItemsSketch<>(dataList, weightList, k, n,
                    allocatedItems, rf, hCount, rCount, totalRWeight);

    if (isGadget) {
      sketch.marks_ = markList;
      sketch.numMarksInH_ = markCount;
    }

    return sketch;
  }