void write()

in tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java [320:416]


    void write(int threadIdx,
               Journal[] journals,
               int numLedgersForThisThread,
               double writeRate,
               int maxOutstandingBytesForThisThread,
               long numRecordsForThisThread,
               long numBytesForThisThread) throws Exception {
        log.info("Write thread {} started with : rate = {},"
                + " num records = {}, num bytes = {}, max outstanding bytes = {}",
            threadIdx,
            writeRate,
            numRecordsForThisThread,
            numBytesForThisThread,
            maxOutstandingBytesForThisThread);

        RateLimiter limiter;
        if (writeRate > 0) {
            limiter = RateLimiter.create(writeRate);
        } else {
            limiter = null;
        }
        final Semaphore semaphore;
        if (maxOutstandingBytesForThisThread > 0) {
            semaphore = new Semaphore(maxOutstandingBytesForThisThread);
        } else {
            semaphore = null;
        }

        // Acquire 1 second worth of records to have a slower ramp-up
        if (limiter != null) {
            limiter.acquire((int) writeRate);
        }

        long totalWritten = 0L;
        long totalBytesWritten = 0L;
        final int numJournals = journals.length;
        byte[] payload = new byte[flags.recordSize];
        ThreadLocalRandom.current().nextBytes(payload);
        ByteBuf payloadBuf = Unpooled.wrappedBuffer(payload);
        long[] entryIds = new long[numLedgersForThisThread];
        Arrays.fill(entryIds, 0L);
        while (true) {
            for (int i = 0; i < numJournals; i++) {
                int ledgerIdx = ThreadLocalRandom.current().nextInt(numLedgersForThisThread);
                long lid = threadIdx * numLedgersForThisThread + ledgerIdx;
                long eid = entryIds[ledgerIdx]++;
                ByteBuf buf = payloadBuf.retainedDuplicate();
                int len = buf.readableBytes();

                if (numRecordsForThisThread > 0
                    && totalWritten >= numRecordsForThisThread) {
                    markPerfDone();
                }
                if (numBytesForThisThread > 0
                    && totalBytesWritten >= numBytesForThisThread) {
                    markPerfDone();
                }
                if (null != semaphore) {
                    semaphore.acquire(len);
                }

                totalWritten++;
                totalBytesWritten += len;
                if (null != limiter) {
                    limiter.acquire(len);
                }
                final long sendTime = System.nanoTime();
                journals[i].logAddEntry(
                    lid,
                    eid,
                    buf,
                    false,
                    (rc, ledgerId, entryId, addr, ctx) -> {
                        ReferenceCountUtil.release(buf);
                        if (0 == rc) {
                            if (null != semaphore) {
                                semaphore.release(len);
                            }

                            recordsWritten.increment();
                            bytesWritten.add(len);

                            long latencyMicros = TimeUnit.NANOSECONDS.toMicros(
                                System.nanoTime() - sendTime
                            );
                            recorder.recordValue(latencyMicros);
                            cumulativeRecorder.recordValue(latencyMicros);
                        } else {
                            log.warn("Error at writing records : ", BookieException.create(rc));
                            Runtime.getRuntime().exit(-1);
                        }
                    },
                    null
                );
            }
        }
    }