private void send0()

in java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java [435:543]


    private void send0(SettableFuture<List<SendReceiptImpl>> future0, String topic, MessageType messageType,
        final List<MessageQueueImpl> candidates, final List<PublishingMessageImpl> messages, final int attempt) {
        // Calculate the current message queue.
        final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
        final List<MessageType> acceptMessageTypes = mq.getAcceptMessageTypes();
        if (publishingSettings.isValidateMessageType() && !acceptMessageTypes.contains(messageType)) {
            final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with "
                + "topic accept message types, topic=" + topic + ", actualMessageType=" + messageType + ", "
                + "acceptMessageTypes=" + acceptMessageTypes);
            future0.setException(e);
            return;
        }
        final Endpoints endpoints = mq.getBroker().getEndpoints();
        final ListenableFuture<List<SendReceiptImpl>> future = send0(endpoints, messages, mq);
        final int maxAttempts = this.getRetryPolicy().getMaxAttempts();

        // Intercept before message publishing.
        final List<GeneralMessage> generalMessages = messages.stream().map((Function<PublishingMessageImpl,
            GeneralMessage>) GeneralMessageImpl::new).collect(Collectors.toList());
        final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.SEND);
        doBefore(context, generalMessages);

        Futures.addCallback(future, new FutureCallback<List<SendReceiptImpl>>() {
            @Override
            public void onSuccess(List<SendReceiptImpl> sendReceipts) {
                // Should never reach here.
                if (sendReceipts.size() != messages.size()) {
                    final InternalErrorException e = new InternalErrorException("[Bug] due to an"
                        + " unknown reason from remote, received send receipt's quantity " + sendReceipts.size()
                        + " is not equal to sent message's quantity " + messages.size());
                    future0.setException(e);

                    // Intercept after message publishing.
                    final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
                        MessageHookPointsStatus.ERROR);
                    doAfter(context0, generalMessages);

                    return;
                }
                // Intercept after message publishing.
                final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
                    MessageHookPointsStatus.OK);
                doAfter(context0, generalMessages);

                // No need more attempts.
                future0.set(sendReceipts);
                // Resend message(s) successfully.
                if (1 < attempt) {
                    // Collect messageId(s) for logging.
                    List<MessageId> messageIds = new ArrayList<>();
                    for (SendReceipt receipt : sendReceipts) {
                        messageIds.add(receipt.getMessageId());
                    }
                    log.info("Resend message successfully, topic={}, messageId(s)={}, maxAttempts={}, "
                            + "attempt={}, endpoints={}, clientId={}", topic, messageIds, maxAttempts, attempt,
                        endpoints, clientId);
                }
                // Send message(s) successfully on first attempt, return directly.
            }

            @Override
            public void onFailure(Throwable t) {
                // Intercept after message publishing.
                final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context,
                    MessageHookPointsStatus.ERROR);
                doAfter(context0, generalMessages);

                // Collect messageId(s) for logging.
                List<MessageId> messageIds = new ArrayList<>();
                for (PublishingMessageImpl message : messages) {
                    messageIds.add(message.getMessageId());
                }
                // Isolate endpoints because of sending failure.
                isolate(endpoints);
                if (attempt >= maxAttempts) {
                    // No need more attempts.
                    future0.setException(t);
                    log.error("Failed to send message(s) finally, run out of attempt times, maxAttempts={}, " +
                            "attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}",
                        maxAttempts, attempt, topic, messageIds, endpoints, clientId, t);
                    return;
                }
                // No need more attempts for transactional message.
                if (MessageType.TRANSACTION.equals(messageType)) {
                    future0.setException(t);
                    log.error("Failed to send transactional message finally, maxAttempts=1, attempt={}, " +
                            "topic={}, messageId(s)={}, endpoints={}, clientId={}", attempt, topic, messageIds,
                        endpoints, clientId, t);
                    return;
                }
                // Try to do more attempts.
                int nextAttempt = 1 + attempt;
                // Retry immediately if the request is not throttled.
                if (!(t instanceof TooManyRequestsException)) {
                    log.warn("Failed to send message, would attempt to resend right now, maxAttempts={}, "
                            + "attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", maxAttempts, attempt,
                        topic, messageIds, endpoints, clientId, t);
                    send0(future0, topic, messageType, candidates, messages, nextAttempt);
                    return;
                }
                final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
                log.warn("Failed to send message due to too many requests, would attempt to resend after {}, "
                        + "maxAttempts={}, attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay,
                    maxAttempts, attempt, topic, messageIds, endpoints, clientId, t);
                ProducerImpl.this.getClientManager().getScheduler().schedule(() -> send0(future0, topic, messageType,
                    candidates, messages, nextAttempt), delay.toNanos(), TimeUnit.NANOSECONDS);
            }
        }, clientCallbackExecutor);
    }