in java/e2e-v4/src/main/java/org/apache/rocketmq/client/rmq/RMQNormalProducer.java [141:169]
public void sendAsync(String topic, RMQSendCallBack callBack, int messageNum) {
logger.info("Producer start to async send messages");
for (int i = 0; i < messageNum; i++) {
Message message = MessageFactory.buildOneMessage(topic);
MessageExt messageExt = null;
try {
producer.send(message, callBack);
callBack.waitResponse();
if (callBack.isbSuccessResponse()) {
messageExt = new MessageExt();
messageExt.setMsgId(callBack.getMessageId());
this.enqueueMessages.addData(messageExt);
}
if (callBack.isbFailResponse()) {
this.enqueueFailedMessages.addData(messageExt);
}
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("Producer async send messages finished");
if (enqueueFailedMessages.getAllData().size() > 0) {
logger.warn("send failed messages: {}", enqueueFailedMessages.getAllData());
}
}