in java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java [458:558]
private void send0(SettableFuture<List<SendReceiptImpl>> future0, String topic, MessageType messageType,
final List<MessageQueueImpl> candidates, final List<PublishingMessageImpl> messages, final int attempt) {
// Calculate the current message queue.
final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
final List<MessageType> acceptMessageTypes = mq.getAcceptMessageTypes();
if (publishingSettings.isValidateMessageType() && !acceptMessageTypes.contains(messageType)) {
final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with "
+ "topic accept message types, topic=" + topic + ", actualMessageType=" + messageType + ", "
+ "acceptMessageTypes=" + acceptMessageTypes);
future0.setException(e);
return;
}
final Endpoints endpoints = mq.getBroker().getEndpoints();
final ListenableFuture<List<SendReceiptImpl>> future = send0(endpoints, messages, mq);
final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
// Intercept before message publishing.
final List<GeneralMessage> generalMessages = messages.stream().map((Function<PublishingMessageImpl,
GeneralMessage>) GeneralMessageImpl::new).collect(Collectors.toList());
final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.SEND);
doBefore(context, generalMessages);
Futures.addCallback(future, new FutureCallback<List<SendReceiptImpl>>() {
@Override
public void onSuccess(List<SendReceiptImpl> sendReceipts) {
// Should never reach here.
if (sendReceipts.size() != messages.size()) {
final InternalErrorException e = new InternalErrorException("[Bug] due to an"
+ " unknown reason from remote, received send receipt's quantity " + sendReceipts.size()
+ " is not equal to sent message's quantity " + messages.size());
future0.setException(e);
// Intercept after message publishing.
final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
return;
}
// Intercept after message publishing.
final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.OK);
doAfter(context0, generalMessages);
// No need more attempts.
future0.set(sendReceipts);
// Resend message(s) successfully.
if (1 < attempt) {
// Collect messageId(s) for logging.
List<MessageId> messageIds = new ArrayList<>();
for (SendReceipt receipt : sendReceipts) {
messageIds.add(receipt.getMessageId());
}
log.info("Resend message successfully, topic={}, messageId(s)={}, maxAttempts={}, "
+ "attempt={}, endpoints={}, clientId={}", topic, messageIds, maxAttempts, attempt,
endpoints, clientId);
}
// Send message(s) successfully on first attempt, return directly.
}
@Override
public void onFailure(Throwable t) {
// Intercept after message publishing.
final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
MessageHookPointsStatus.ERROR);
doAfter(context0, generalMessages);
// Collect messageId(s) for logging.
List<MessageId> messageIds = new ArrayList<>();
for (PublishingMessageImpl message : messages) {
messageIds.add(message.getMessageId());
}
// Isolate endpoints because of sending failure.
isolate(endpoints);
if (attempt >= maxAttempts) {
// No need more attempts.
future0.setException(t);
log.error("Failed to send message(s) finally, run out of attempt times, maxAttempts={}, " +
"attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}",
maxAttempts, attempt, topic, messageIds, endpoints, clientId, t);
return;
}
// Try to do more attempts.
int nextAttempt = 1 + attempt;
// Retry immediately if the request is not throttled.
if (!(t instanceof TooManyRequestsException)) {
log.warn("Failed to send message, would attempt to resend right now, maxAttempts={}, "
+ "attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", maxAttempts, attempt,
topic, messageIds, endpoints, clientId, t);
send0(future0, topic, messageType, candidates, messages, nextAttempt);
return;
}
final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
log.warn("Failed to send message due to too many requests, would attempt to resend after {}, "
+ "maxAttempts={}, attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay,
maxAttempts, attempt, topic, messageIds, endpoints, clientId, t);
ProducerImpl.this.getClientManager().getScheduler().schedule(() -> send0(future0, topic, messageType,
candidates, messages, nextAttempt), delay.toNanos(), TimeUnit.NANOSECONDS);
}
}, clientCallbackExecutor);
}