in tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java [320:416]
void write(int threadIdx,
Journal[] journals,
int numLedgersForThisThread,
double writeRate,
int maxOutstandingBytesForThisThread,
long numRecordsForThisThread,
long numBytesForThisThread) throws Exception {
log.info("Write thread {} started with : rate = {},"
+ " num records = {}, num bytes = {}, max outstanding bytes = {}",
threadIdx,
writeRate,
numRecordsForThisThread,
numBytesForThisThread,
maxOutstandingBytesForThisThread);
RateLimiter limiter;
if (writeRate > 0) {
limiter = RateLimiter.create(writeRate);
} else {
limiter = null;
}
final Semaphore semaphore;
if (maxOutstandingBytesForThisThread > 0) {
semaphore = new Semaphore(maxOutstandingBytesForThisThread);
} else {
semaphore = null;
}
// Acquire 1 second worth of records to have a slower ramp-up
if (limiter != null) {
limiter.acquire((int) writeRate);
}
long totalWritten = 0L;
long totalBytesWritten = 0L;
final int numJournals = journals.length;
byte[] payload = new byte[flags.recordSize];
ThreadLocalRandom.current().nextBytes(payload);
ByteBuf payloadBuf = Unpooled.wrappedBuffer(payload);
long[] entryIds = new long[numLedgersForThisThread];
Arrays.fill(entryIds, 0L);
while (true) {
for (int i = 0; i < numJournals; i++) {
int ledgerIdx = ThreadLocalRandom.current().nextInt(numLedgersForThisThread);
long lid = threadIdx * numLedgersForThisThread + ledgerIdx;
long eid = entryIds[ledgerIdx]++;
ByteBuf buf = payloadBuf.retainedDuplicate();
int len = buf.readableBytes();
if (numRecordsForThisThread > 0
&& totalWritten >= numRecordsForThisThread) {
markPerfDone();
}
if (numBytesForThisThread > 0
&& totalBytesWritten >= numBytesForThisThread) {
markPerfDone();
}
if (null != semaphore) {
semaphore.acquire(len);
}
totalWritten++;
totalBytesWritten += len;
if (null != limiter) {
limiter.acquire(len);
}
final long sendTime = System.nanoTime();
journals[i].logAddEntry(
lid,
eid,
buf,
false,
(rc, ledgerId, entryId, addr, ctx) -> {
ReferenceCountUtil.release(buf);
if (0 == rc) {
if (null != semaphore) {
semaphore.release(len);
}
recordsWritten.increment();
bytesWritten.add(len);
long latencyMicros = TimeUnit.NANOSECONDS.toMicros(
System.nanoTime() - sendTime
);
recorder.recordValue(latencyMicros);
cumulativeRecorder.recordValue(latencyMicros);
} else {
log.warn("Error at writing records : ", BookieException.create(rc));
Runtime.getRuntime().exit(-1);
}
},
null
);
}
}
}