in src/main/java/org/apache/datasketches/sampling/VarOptItemsSketch.java [271:424]
public static <T> VarOptItemsSketch<T> heapify(final Memory srcMem,
final ArrayOfItemsSerDe<T> serDe) {
final int numPreLongs = getAndCheckPreLongs(srcMem);
final ResizeFactor rf = ResizeFactor.getRF(extractResizeFactor(srcMem));
final int serVer = extractSerVer(srcMem);
final int familyId = extractFamilyID(srcMem);
final int flags = extractFlags(srcMem);
final boolean isEmpty = (flags & EMPTY_FLAG_MASK) != 0;
final boolean isGadget = (flags & GADGET_FLAG_MASK) != 0;
// Check values
if (isEmpty) {
if (numPreLongs != VO_PRELONGS_EMPTY) {
throw new SketchesArgumentException("Possible corruption: Must be " + VO_PRELONGS_EMPTY
+ " for an empty sketch. Found: " + numPreLongs);
}
} else {
if ((numPreLongs != VO_PRELONGS_WARMUP)
&& (numPreLongs != VO_PRELONGS_FULL)) {
throw new SketchesArgumentException("Possible corruption: Must be " + VO_PRELONGS_WARMUP
+ " or " + VO_PRELONGS_FULL + " for a non-empty sketch. Found: " + numPreLongs);
}
}
if (serVer != VAROPT_SER_VER) {
throw new SketchesArgumentException(
"Possible Corruption: Ser Ver must be " + VAROPT_SER_VER + ": " + serVer);
}
final int reqFamilyId = Family.VAROPT.getID();
if (familyId != reqFamilyId) {
throw new SketchesArgumentException(
"Possible Corruption: FamilyID must be " + reqFamilyId + ": " + familyId);
}
final int k = extractK(srcMem);
if (k < 1) {
throw new SketchesArgumentException("Possible Corruption: k must be at least 1: " + k);
}
if (isEmpty) {
assert numPreLongs == Family.VAROPT.getMinPreLongs();
return new VarOptItemsSketch<>(k, rf);
}
final long n = extractN(srcMem);
if (n < 0) {
throw new SketchesArgumentException("Possible Corruption: n cannot be negative: " + n);
}
// get rest of preamble
final int hCount = extractHRegionItemCount(srcMem);
final int rCount = extractRRegionItemCount(srcMem);
if (hCount < 0) {
throw new SketchesArgumentException("Possible Corruption: H region count cannot be "
+ "negative: " + hCount);
}
if (rCount < 0) {
throw new SketchesArgumentException("Possible Corruption: R region count cannot be "
+ "negative: " + rCount);
}
double totalRWeight = 0.0;
if (numPreLongs == Family.VAROPT.getMaxPreLongs()) {
if (rCount > 0) {
totalRWeight = extractTotalRWeight(srcMem);
} else {
throw new SketchesArgumentException(
"Possible Corruption: "
+ Family.VAROPT.getMaxPreLongs() + " preLongs but no items in R region");
}
}
final int preLongBytes = numPreLongs << 3;
final int totalItems = hCount + rCount;
int allocatedItems = k + 1; // default to full
if (rCount == 0) {
// Not in sampling mode, so determine size to allocate, using ceilingLog2(hCount) as minimum
final int ceilingLgK = Util.exactLog2OfInt(Util.ceilingPowerOf2(k), "heapify");
final int minLgSize = Util.exactLog2OfInt(Util.ceilingPowerOf2(hCount), "heapify");
final int initialLgSize = SamplingUtil.startingSubMultiple(ceilingLgK, rf.lg(),
Math.max(minLgSize, MIN_LG_ARR_ITEMS));
allocatedItems = SamplingUtil.getAdjustedSize(k, 1 << initialLgSize);
if (allocatedItems == k) {
++allocatedItems;
}
}
// allocate full-sized ArrayLists, but we store only hCount weights at any moment
final long weightOffsetBytes = TOTAL_WEIGHT_R_DOUBLE + (rCount > 0 ? Double.BYTES : 0);
final ArrayList<Double> weightList = new ArrayList<>(allocatedItems);
final double[] wts = new double[allocatedItems];
srcMem.getDoubleArray(weightOffsetBytes, wts, 0, hCount);
// can't use Arrays.asList(wts) since double[] rather than Double[]
for (int i = 0; i < hCount; ++ i) {
if (wts[i] <= 0.0) {
throw new SketchesArgumentException("Possible Corruption: "
+ "Non-positive weight in heapify(): " + wts[i]);
}
weightList.add(wts[i]);
}
// marks, if we have a gadget
long markBytes = 0;
int markCount = 0;
ArrayList<Boolean> markList = null;
if (isGadget) {
final long markOffsetBytes = preLongBytes + ((long) hCount * Double.BYTES);
markBytes = ArrayOfBooleansSerDe.computeBytesNeeded(hCount);
markList = new ArrayList<>(allocatedItems);
final ArrayOfBooleansSerDe booleansSerDe = new ArrayOfBooleansSerDe();
final Boolean[] markArray = booleansSerDe.deserializeFromMemory(
srcMem.region(markOffsetBytes, (hCount >>> 3) + 1), 0, hCount);
for (Boolean mark : markArray) {
if (mark) { ++markCount; }
}
markList.addAll(Arrays.asList(markArray));
}
final long offsetBytes = preLongBytes + ((long) hCount * Double.BYTES) + markBytes;
final T[] data = serDe.deserializeFromMemory(
srcMem.region(offsetBytes, srcMem.getCapacity() - offsetBytes), 0, totalItems);
final List<T> wrappedData = Arrays.asList(data);
final ArrayList<T> dataList = new ArrayList<>(allocatedItems);
dataList.addAll(wrappedData.subList(0, hCount));
// Load items in R as needed
if (rCount > 0) {
weightList.add(-1.0); // the gap
if (isGadget) { markList.add(false); } // the gap
for (int i = 0; i < rCount; ++i) {
weightList.add(-1.0);
if (isGadget) { markList.add(false); }
}
dataList.add(null); // the gap
dataList.addAll(wrappedData.subList(hCount, totalItems));
}
final VarOptItemsSketch<T> sketch =
new VarOptItemsSketch<>(dataList, weightList, k, n,
allocatedItems, rf, hCount, rCount, totalRWeight);
if (isGadget) {
sketch.marks_ = markList;
sketch.numMarksInH_ = markCount;
}
return sketch;
}