in HyperAlloc/src/main/java/com/amazon/corretto/benchmark/hyperalloc/TaskBase.java [57:119]
static Callable<Long> createSingle(final ObjectStore store, final long rateInMb,
final long durationInMs, final int minObjectSize, final int maxObjectSize,
final int queueLength, double rampUpSeconds) {
return () -> {
final long rate = rateInMb * 1024 * 1024;
final ArrayDeque<AllocObject> survivorQueue = new ArrayDeque<>();
final long start = System.nanoTime();
final long end = start + durationInMs * 1000000;
Function<Double, Double> rampUp = null;
if (rampUpSeconds > 0) {
rampUp = sinusoidalRampUp(rate, rampUpSeconds);
}
final TokenBucket throughput = new TokenBucket(rate);
int longLivedRate = MAX_LONG_LIVED_RATIO;
int longLivedCounter = longLivedRate;
while (System.nanoTime() < end) {
long wave = 0L;
while (wave < rate / 10) {
if (rampUp != null) {
final double elapsedSeconds = (System.nanoTime() - start) / NANOS_PER_SECOND;
if (elapsedSeconds < rampUpSeconds) {
double currentRate = rampUp.apply(elapsedSeconds);
throughput.adjustThrottle((long)currentRate);
} else {
throughput.adjustThrottle(rate);
rampUp = null;
}
}
if (!throughput.isThrottled()) {
final AllocObject obj = AllocObject.create(minObjectSize, maxObjectSize, null);
throughput.deduct(obj.getRealSize());
wave += obj.getRealSize();
survivorQueue.addLast(obj);
if (survivorQueue.size() > queueLength) {
final AllocObject removed = survivorQueue.poll();
if (--longLivedCounter == 0) {
if (store.tryAdd(removed)) {
if (longLivedRate > MAX_LONG_LIVED_RATIO) {
longLivedRate /= 2;
}
} else {
if (longLivedRate < MIN_LONG_LIVED_RATIO) {
longLivedRate *= 2;
}
}
longLivedCounter = longLivedRate;
}
}
} else {
Thread.sleep(DEFAULT_SLEEP_TIME_IN_MILLISECONDS);
break;
}
}
}
return throughput.getCurrent();
};
}