in pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java [483:720]
private void runProducer(int producerId,
PerformanceProducer arguments,
long numMessages,
int msgRate,
List<byte[]> payloadByteList,
byte[] payloadBytes,
CountDownLatch doneLatch) {
PulsarClient client = null;
boolean produceEnough = false;
try {
// Now processing command line arguments
List<Future<Producer<byte[]>>> futures = new ArrayList<>();
ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
.enableTransaction(this.isEnableTransaction);
client = clientBuilder.build();
ProducerBuilder<byte[]> producerBuilder = createProducerBuilder(client, producerId);
AtomicReference<Transaction> transactionAtomicReference;
if (this.isEnableTransaction) {
producerBuilder.sendTimeout(0, TimeUnit.SECONDS);
transactionAtomicReference = new AtomicReference<>(client.newTransaction()
.withTransactionTimeout(this.transactionTimeout, TimeUnit.SECONDS)
.build()
.get());
} else {
transactionAtomicReference = new AtomicReference<>(null);
}
for (int i = 0; i < this.numTopics; i++) {
String topic = this.topics.get(i);
log.info("Adding {} publishers on topic {}", this.numProducers, topic);
for (int j = 0; j < this.numProducers; j++) {
ProducerBuilder<byte[]> prodBuilder = producerBuilder.clone().topic(topic);
if (this.chunkingAllowed) {
prodBuilder.enableChunking(true);
prodBuilder.enableBatching(false);
}
futures.add(prodBuilder.createAsync());
}
}
final List<Producer<byte[]>> producers = new ArrayList<>(futures.size());
for (Future<Producer<byte[]>> future : futures) {
producers.add(future.get());
}
Collections.shuffle(producers);
log.info("Created {} producers", producers.size());
RateLimiter rateLimiter = RateLimiter.create(msgRate);
long startTime = System.nanoTime();
long warmupEndTime = startTime + (long) (this.warmupTimeSeconds * 1e9);
long testEndTime = startTime + (long) (this.testTime * 1e9);
MessageKeyGenerationMode msgKeyMode = null;
if (isNotBlank(this.messageKeyGenerationMode)) {
try {
msgKeyMode = MessageKeyGenerationMode.valueOf(this.messageKeyGenerationMode);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("messageKeyGenerationMode only support [autoIncrement, random]");
}
}
// Send messages on all topics/producers
AtomicLong totalSent = new AtomicLong(0);
AtomicLong numMessageSend = new AtomicLong(0);
Semaphore numMsgPerTxnLimit = new Semaphore(this.numMessagesPerTransaction);
while (true) {
if (produceEnough) {
break;
}
for (Producer<byte[]> producer : producers) {
if (this.testTime > 0) {
if (System.nanoTime() > testEndTime) {
log.info("------------- DONE (reached the maximum duration: [{} seconds] of production) "
+ "--------------", this.testTime);
doneLatch.countDown();
produceEnough = true;
break;
}
}
if (numMessages > 0) {
if (totalSent.get() >= numMessages) {
log.info("------------- DONE (reached the maximum number: {} of production) --------------"
, numMessages);
doneLatch.countDown();
produceEnough = true;
break;
}
}
rateLimiter.acquire();
//if transaction is disable, transaction will be null.
Transaction transaction = transactionAtomicReference.get();
final long sendTime = System.nanoTime();
byte[] payloadData;
if (this.payloadFilename != null) {
if (messageFormatter != null) {
payloadData = messageFormatter.formatMessage(this.producerName, totalSent.get(),
payloadByteList.get(ThreadLocalRandom.current().nextInt(payloadByteList.size())));
} else {
payloadData = payloadByteList.get(
ThreadLocalRandom.current().nextInt(payloadByteList.size()));
}
} else {
payloadData = payloadBytes;
}
TypedMessageBuilder<byte[]> messageBuilder;
if (this.isEnableTransaction) {
if (this.numMessagesPerTransaction > 0) {
try {
numMsgPerTxnLimit.acquire();
} catch (InterruptedException exception){
log.error("Get exception: ", exception);
}
}
messageBuilder = producer.newMessage(transaction)
.value(payloadData);
} else {
messageBuilder = producer.newMessage()
.value(payloadData);
}
if (this.delay > 0) {
messageBuilder.deliverAfter(this.delay, TimeUnit.SECONDS);
} else if (this.delayRange != null) {
final long deliverAfter = ThreadLocalRandom.current()
.nextLong(this.delayRange.lowerEndpoint(), this.delayRange.upperEndpoint());
messageBuilder.deliverAfter(deliverAfter, TimeUnit.SECONDS);
}
if (this.setEventTime) {
messageBuilder.eventTime(System.currentTimeMillis());
}
//generate msg key
if (msgKeyMode == MessageKeyGenerationMode.random) {
messageBuilder.key(String.valueOf(ThreadLocalRandom.current().nextInt()));
} else if (msgKeyMode == MessageKeyGenerationMode.autoIncrement) {
messageBuilder.key(String.valueOf(totalSent.get()));
}
PulsarClient pulsarClient = client;
messageBuilder.sendAsync().thenRun(() -> {
bytesSent.add(payloadData.length);
messagesSent.increment();
totalSent.incrementAndGet();
totalMessagesSent.increment();
totalBytesSent.add(payloadData.length);
long now = System.nanoTime();
if (now > warmupEndTime) {
long latencyMicros = NANOSECONDS.toMicros(now - sendTime);
recorder.recordValue(latencyMicros);
cumulativeRecorder.recordValue(latencyMicros);
}
}).exceptionally(ex -> {
// Ignore the exception of recorder since a very large latencyMicros will lead
// ArrayIndexOutOfBoundsException in AbstractHistogram
if (ex.getCause() instanceof ArrayIndexOutOfBoundsException) {
return null;
}
log.warn("Write message error with exception", ex);
messagesFailed.increment();
if (this.exitOnFailure) {
PerfClientUtils.exit(1);
}
return null;
});
if (this.isEnableTransaction
&& numMessageSend.incrementAndGet() == this.numMessagesPerTransaction) {
if (!this.isAbortTransaction) {
transaction.commit()
.thenRun(() -> {
if (log.isDebugEnabled()) {
log.debug("Committed transaction {}",
transaction.getTxnID().toString());
}
totalEndTxnOpSuccessNum.increment();
numTxnOpSuccess.increment();
})
.exceptionally(exception -> {
log.error("Commit transaction failed with exception : ",
exception);
totalEndTxnOpFailNum.increment();
return null;
});
} else {
transaction.abort().thenRun(() -> {
if (log.isDebugEnabled()) {
log.debug("Abort transaction {}", transaction.getTxnID().toString());
}
totalEndTxnOpSuccessNum.increment();
numTxnOpSuccess.increment();
}).exceptionally(exception -> {
log.error("Abort transaction {} failed with exception",
transaction.getTxnID().toString(),
exception);
totalEndTxnOpFailNum.increment();
return null;
});
}
while (true) {
try {
Transaction newTransaction = pulsarClient.newTransaction()
.withTransactionTimeout(this.transactionTimeout,
TimeUnit.SECONDS).build().get();
transactionAtomicReference.compareAndSet(transaction, newTransaction);
numMessageSend.set(0);
numMsgPerTxnLimit.release(this.numMessagesPerTransaction);
totalNumTxnOpenTxnSuccess.increment();
break;
} catch (Exception e){
totalNumTxnOpenTxnFail.increment();
log.error("Failed to new transaction with exception: ", e);
}
}
}
}
}
} catch (Throwable t) {
log.error("Got error", t);
} finally {
if (!produceEnough) {
doneLatch.countDown();
}
if (null != client) {
try {
client.close();
} catch (PulsarClientException e) {
log.error("Failed to close test client", e);
}
}
}
}