private async Task Send0()

in csharp/rocketmq-client-csharp/Producer.cs [203:291]


        private async Task<SendReceipt> Send0(PublishingMessage message, List<MessageQueue> candidates)
        {
            var retryPolicy = GetRetryPolicy();
            var maxAttempts = retryPolicy.GetMaxAttempts();
            Exception exception = null;
            for (var attempt = 1; attempt <= maxAttempts; attempt++)
            {
                var stopwatch = Stopwatch.StartNew();

                var candidateIndex = (attempt - 1) % candidates.Count;
                var mq = candidates[candidateIndex];
                if (PublishingSettings.IsValidateMessageType() && !mq.AcceptMessageTypes.Contains(message.MessageType))
                {
                    throw new ArgumentException(
                        "Current message type does not match with the accept message types," +
                        $" topic={message.Topic}, actualMessageType={message.MessageType}" +
                        $" acceptMessageType={string.Join(",", mq.AcceptMessageTypes)}");
                }

                var sendMessageRequest = WrapSendMessageRequest(message, mq);
                var endpoints = mq.Broker.Endpoints;
                try
                {
                    var invocation =
                        await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
                    var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation);

                    var sendReceipt = sendReceipts.First();
                    if (attempt > 1)
                    {
                        Logger.Info(
                            $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
                            $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
                    }

                    return sendReceipt;
                }
                catch (Exception e)
                {
                    exception = e;

                    // Isolate current endpoints.
                    Isolated[endpoints] = true;
                    if (attempt >= maxAttempts)
                    {
                        Logger.Error(e, "Failed to send message finally, run out of attempt times, " +
                                        $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
                                        $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
                        throw;
                    }

                    if (MessageType.Transaction == message.MessageType)
                    {
                        Logger.Error(e, "Failed to send transaction message, run out of attempt times, " +
                                        $"topic={message.Topic}, maxAttempt=1, attempt={attempt}, " +
                                        $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
                        throw;
                    }

                    if (!(exception is TooManyRequestsException))
                    {
                        // Retry immediately if the request is not throttled.
                        Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " +
                                       $"attempt={attempt}, endpoints={endpoints}, messageId={message.MessageId}," +
                                       $" clientId={ClientId}");
                        continue;
                    }

                    var nextAttempt = 1 + attempt;
                    var delay = retryPolicy.GetNextAttemptDelay(nextAttempt);
                    await Task.Delay(delay);
                    Logger.Warn(e, "Failed to send message due to too many request, would attempt to resend " +
                                   $"after {delay}, topic={message.Topic}, maxAttempts={maxAttempts}, attempt={attempt}, " +
                                   $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
                }
                finally
                {
                    var elapsed = stopwatch.Elapsed.Milliseconds;
                    _sendCostTimeHistogram.Record(elapsed,
                        new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic),
                        new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId),
                        new KeyValuePair<string, object>(MetricConstant.InvocationStatus,
                            null == exception ? MetricConstant.Success : MetricConstant.Failure));
                }
            }

            throw new Exception($"Failed to send message finally, topic={message.Topic}, clientId={ClientId}",
                exception);
        }