in accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java [317:397]
private void savePendingCheckpointAndResetScanDistance(int checkpointIndex)
{
// use the tail of checkpointListBuf to buffer ranges we plan to tenure
ensureCapacity(tenured.count() + scan.watermark());
scan.reset();
if (isAccurate)
{
// TODO (low priority, efficiency): we can shift back the existing scanDistance if it's far enough from
// the next checkpoint. this might permit us to skip some comparisons
scan.resetPeakMax(tenured);
for (Tenured<Ranges, Range, RoutingKey> tenured : this.tenured)
{
int distanceToEnd = (tenured.lastIndex - checkpointIndex);
if (distanceToEnd >= scan.peakMax)
break;
int scanDistance = tenured.lastIndex - tenured.index;
if (scanDistance <= scan.peakMax)
scan.updateScanDistance(tenured.index, scanDistance, null);
}
if (scan.watermark() < scan.goal)
{
int ri = Scan.minScanIndex(checkpointIndex, scan.goal);
while (ri < checkpointIndex)
{
RoutingKey end = accessor.end(ranges, ri);
int scanLimit = scanLimit(ri, scan.peakMax);
if (!shouldTenure(end, scanLimit))
scan.update(end, ri, ranges, scanLimit, null);
++ri;
}
}
}
else
{
// the maximum scan distance that could ever have been adopted for last chunk
int oldPeakMax = scan.peakMax();
// the minimum scan distance we will start with for processing the proceeding ranges
// note: this may increase if we decide to tenure additional ranges, at which point it will be the actual newPeakMax
int newMinPeakMax = scan.newPeakMax(tenured);
int minUntenuredIndex = scan.minUntenuredIndex(checkpointIndex, tenured);
int minScanIndex = Scan.minScanIndex(checkpointIndex, newMinPeakMax);
// we now make sure tenured and scan are correct for the new parameters.
// 1) if our peakMax is lower then we need to go back and find items to tenure that we previously marked for scanning
// 2) we must also reset our scan distances
// since our peakMax is determined by tenured.count(), but we are tenuring items here we keep things simple
// and do not account for those items we tenure but would later permit to scan as our peakMax grows
int ri = Math.min(minUntenuredIndex, minScanIndex);
while (ri < checkpointIndex)
{
RoutingKey end = accessor.end(ranges, ri);
int newPeakMax = scan.newPeakMax(tenured);
int scanLimit = scanLimit(ri, newPeakMax);
if (shouldTenure(end, scanLimit))
{
// note: might have already been tenured
// in this case our untenureLimit may be incorrect, but we won't use it
if (ri >= minUntenuredIndex && newPeakMax < oldPeakMax)
tenured.tenure(end, ri, ranges, scanLimit + 1, scanLimit(ri, oldPeakMax));
}
else
{
// this might effectively remove a previously tenured item
scan.update(end, ri, ranges, scanLimit, null);
}
++ri;
}
scan.resetPeakMax(tenured);
}
pending.atIndex = checkpointIndex;
pending.clear();
tenured.rollover(pending);
}