private void runProducer()

in pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java [483:720]


    private void runProducer(int producerId,
                                    PerformanceProducer arguments,
                                    long numMessages,
                                    int msgRate,
                                    List<byte[]> payloadByteList,
                                    byte[] payloadBytes,
                                    CountDownLatch doneLatch) {
        PulsarClient client = null;
        boolean produceEnough = false;
        try {
            // Now processing command line arguments
            List<Future<Producer<byte[]>>> futures = new ArrayList<>();


            ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
                    .enableTransaction(this.isEnableTransaction);

            client = clientBuilder.build();

            ProducerBuilder<byte[]> producerBuilder = createProducerBuilder(client, producerId);

            AtomicReference<Transaction> transactionAtomicReference;
            if (this.isEnableTransaction) {
                producerBuilder.sendTimeout(0, TimeUnit.SECONDS);
                transactionAtomicReference = new AtomicReference<>(client.newTransaction()
                        .withTransactionTimeout(this.transactionTimeout, TimeUnit.SECONDS)
                        .build()
                        .get());
            } else {
                transactionAtomicReference = new AtomicReference<>(null);
            }

            for (int i = 0; i < this.numTopics; i++) {

                String topic = this.topics.get(i);
                log.info("Adding {} publishers on topic {}", this.numProducers, topic);

                for (int j = 0; j < this.numProducers; j++) {
                    ProducerBuilder<byte[]> prodBuilder = producerBuilder.clone().topic(topic);
                    if (this.chunkingAllowed) {
                        prodBuilder.enableChunking(true);
                        prodBuilder.enableBatching(false);
                    }
                    futures.add(prodBuilder.createAsync());
                }
            }

            final List<Producer<byte[]>> producers = new ArrayList<>(futures.size());
            for (Future<Producer<byte[]>> future : futures) {
                producers.add(future.get());
            }
            Collections.shuffle(producers);

            log.info("Created {} producers", producers.size());

            RateLimiter rateLimiter = RateLimiter.create(msgRate);

            long startTime = System.nanoTime();
            long warmupEndTime = startTime + (long) (this.warmupTimeSeconds * 1e9);
            long testEndTime = startTime + (long) (this.testTime * 1e9);
            MessageKeyGenerationMode msgKeyMode = null;
            if (isNotBlank(this.messageKeyGenerationMode)) {
                try {
                    msgKeyMode = MessageKeyGenerationMode.valueOf(this.messageKeyGenerationMode);
                } catch (IllegalArgumentException e) {
                    throw new IllegalArgumentException("messageKeyGenerationMode only support [autoIncrement, random]");
                }
            }
            // Send messages on all topics/producers
            AtomicLong totalSent = new AtomicLong(0);
            AtomicLong numMessageSend = new AtomicLong(0);
            Semaphore numMsgPerTxnLimit = new Semaphore(this.numMessagesPerTransaction);
            while (true) {
                if (produceEnough) {
                    break;
                }
                for (Producer<byte[]> producer : producers) {
                    if (this.testTime > 0) {
                        if (System.nanoTime() > testEndTime) {
                            log.info("------------- DONE (reached the maximum duration: [{} seconds] of production) "
                                    + "--------------", this.testTime);
                            doneLatch.countDown();
                            produceEnough = true;
                            break;
                        }
                    }

                    if (numMessages > 0) {
                        if (totalSent.get() >= numMessages) {
                            log.info("------------- DONE (reached the maximum number: {} of production) --------------"
                                    , numMessages);
                            doneLatch.countDown();
                            produceEnough = true;
                            break;
                        }
                    }
                    rateLimiter.acquire();
                    //if transaction is disable, transaction will be null.
                    Transaction transaction = transactionAtomicReference.get();
                    final long sendTime = System.nanoTime();

                    byte[] payloadData;

                    if (this.payloadFilename != null) {
                        if (messageFormatter != null) {
                            payloadData = messageFormatter.formatMessage(this.producerName, totalSent.get(),
                                    payloadByteList.get(ThreadLocalRandom.current().nextInt(payloadByteList.size())));
                        } else {
                            payloadData = payloadByteList.get(
                                    ThreadLocalRandom.current().nextInt(payloadByteList.size()));
                        }
                    } else {
                        payloadData = payloadBytes;
                    }
                    TypedMessageBuilder<byte[]> messageBuilder;
                    if (this.isEnableTransaction) {
                        if (this.numMessagesPerTransaction > 0) {
                            try {
                                numMsgPerTxnLimit.acquire();
                            } catch (InterruptedException exception){
                                log.error("Get exception: ", exception);
                            }
                        }
                        messageBuilder = producer.newMessage(transaction)
                                .value(payloadData);
                    } else {
                        messageBuilder = producer.newMessage()
                                .value(payloadData);
                    }
                    if (this.delay > 0) {
                        messageBuilder.deliverAfter(this.delay, TimeUnit.SECONDS);
                    } else if (this.delayRange != null) {
                        final long deliverAfter = ThreadLocalRandom.current()
                                .nextLong(this.delayRange.lowerEndpoint(), this.delayRange.upperEndpoint());
                        messageBuilder.deliverAfter(deliverAfter, TimeUnit.SECONDS);
                    }
                    if (this.setEventTime) {
                        messageBuilder.eventTime(System.currentTimeMillis());
                    }
                    //generate msg key
                    if (msgKeyMode == MessageKeyGenerationMode.random) {
                        messageBuilder.key(String.valueOf(ThreadLocalRandom.current().nextInt()));
                    } else if (msgKeyMode == MessageKeyGenerationMode.autoIncrement) {
                        messageBuilder.key(String.valueOf(totalSent.get()));
                    }
                    PulsarClient pulsarClient = client;
                    messageBuilder.sendAsync().thenRun(() -> {
                        bytesSent.add(payloadData.length);
                        messagesSent.increment();
                        totalSent.incrementAndGet();
                        totalMessagesSent.increment();
                        totalBytesSent.add(payloadData.length);

                        long now = System.nanoTime();
                        if (now > warmupEndTime) {
                            long latencyMicros = NANOSECONDS.toMicros(now - sendTime);
                            recorder.recordValue(latencyMicros);
                            cumulativeRecorder.recordValue(latencyMicros);
                        }
                    }).exceptionally(ex -> {
                        // Ignore the exception of recorder since a very large latencyMicros will lead
                        // ArrayIndexOutOfBoundsException in AbstractHistogram
                        if (ex.getCause() instanceof ArrayIndexOutOfBoundsException) {
                            return null;
                        }
                        log.warn("Write message error with exception", ex);
                        messagesFailed.increment();
                        if (this.exitOnFailure) {
                            PerfClientUtils.exit(1);
                        }
                        return null;
                    });
                    if (this.isEnableTransaction
                            && numMessageSend.incrementAndGet() == this.numMessagesPerTransaction) {
                        if (!this.isAbortTransaction) {
                            transaction.commit()
                                    .thenRun(() -> {
                                        if (log.isDebugEnabled()) {
                                            log.debug("Committed transaction {}",
                                                    transaction.getTxnID().toString());
                                        }
                                        totalEndTxnOpSuccessNum.increment();
                                        numTxnOpSuccess.increment();
                                    })
                                    .exceptionally(exception -> {
                                        log.error("Commit transaction failed with exception : ",
                                                exception);
                                        totalEndTxnOpFailNum.increment();
                                        return null;
                                    });
                        } else {
                            transaction.abort().thenRun(() -> {
                                if (log.isDebugEnabled()) {
                                    log.debug("Abort transaction {}", transaction.getTxnID().toString());
                                }
                                totalEndTxnOpSuccessNum.increment();
                                numTxnOpSuccess.increment();
                            }).exceptionally(exception -> {
                                log.error("Abort transaction {} failed with exception",
                                        transaction.getTxnID().toString(),
                                        exception);
                                totalEndTxnOpFailNum.increment();
                                return null;
                            });
                        }
                        while (true) {
                            try {
                                Transaction newTransaction = pulsarClient.newTransaction()
                                        .withTransactionTimeout(this.transactionTimeout,
                                                TimeUnit.SECONDS).build().get();
                                transactionAtomicReference.compareAndSet(transaction, newTransaction);
                                numMessageSend.set(0);
                                numMsgPerTxnLimit.release(this.numMessagesPerTransaction);
                                totalNumTxnOpenTxnSuccess.increment();
                                break;
                            } catch (Exception e){
                                totalNumTxnOpenTxnFail.increment();
                                log.error("Failed to new transaction with exception: ", e);
                            }
                        }
                    }
                }
            }
        } catch (Throwable t) {
            log.error("Got error", t);
        } finally {
            if (!produceEnough) {
                doneLatch.countDown();
            }
            if (null != client) {
                try {
                    client.close();
                } catch (PulsarClientException e) {
                    log.error("Failed to close test client", e);
                }
            }
        }
    }