public async Task Send()

in rocketmq-client-csharp/Producer.cs [89:200]


        public async Task<SendReceipt> Send(Message message)
        {
            if (!_loadBalancer.ContainsKey(message.Topic))
            {
                var topicRouteData = await GetRouteFor(message.Topic, false);
                if (null == topicRouteData || null == topicRouteData.MessageQueues || 0 == topicRouteData.MessageQueues.Count)
                {
                    Logger.Error($"Failed to resolve route info for {message.Topic}");
                    throw new TopicRouteException(string.Format("No topic route for {0}", message.Topic));
                }

                var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
                _loadBalancer.TryAdd(message.Topic, loadBalancerItem);
            }

            var publishLb = _loadBalancer[message.Topic];

            var request = new rmq::SendMessageRequest();
            var entry = new rmq::Message();
            entry.Body = ByteString.CopyFrom(message.Body);
            entry.Topic = new rmq::Resource();
            entry.Topic.ResourceNamespace = resourceNamespace();
            entry.Topic.Name = message.Topic;
            request.Messages.Add(entry);

            // User properties
            foreach (var item in message.UserProperties)
            {
                entry.UserProperties.Add(item.Key, item.Value);
            }

            entry.SystemProperties = new rmq::SystemProperties();
            entry.SystemProperties.MessageId = message.MessageId;
            entry.SystemProperties.MessageType = rmq::MessageType.Normal;
            if (DateTime.MinValue != message.DeliveryTimestamp)
            {
                entry.SystemProperties.MessageType = rmq::MessageType.Delay;
                entry.SystemProperties.DeliveryTimestamp = Timestamp.FromDateTime(message.DeliveryTimestamp);

                if (message.Fifo())
                {
                    Logger.Warn("A message may not be FIFO and delayed at the same time");
                    throw new MessageException("A message may not be both FIFO and Timed");
                }
            } else if (!String.IsNullOrEmpty(message.MessageGroup))
            {
                entry.SystemProperties.MessageType = rmq::MessageType.Fifo;
                entry.SystemProperties.MessageGroup = message.MessageGroup;
            }
            
            if (!string.IsNullOrEmpty(message.Tag))
            {
                entry.SystemProperties.Tag = message.Tag;
            }

            if (0 != message.Keys.Count)
            {
                foreach (var key in message.Keys)
                {
                    entry.SystemProperties.Keys.Add(key);
                }
            }

            List<string> targets = new List<string>();
            List<rmq::MessageQueue> candidates = publishLb.Select(message.MaxAttemptTimes);
            foreach (var messageQueue in candidates)
            {
                targets.Add(Utilities.TargetUrl(messageQueue));
            }

            var metadata = new Metadata();
            Signature.sign(this, metadata);

            Exception ex = null;

            foreach (var target in targets)
            {
                try
                {
                    var stopWatch = new Stopwatch();
                    stopWatch.Start();
                    rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
                    if (null != response && rmq::Code.Ok == response.Status.Code)
                    {
                        var messageId = response.Entries[0].MessageId;
                        
                        // Account latency histogram
                        stopWatch.Stop();
                        var latency = stopWatch.ElapsedMilliseconds;
                        _sendLatency.Record(latency, new("topic", message.Topic), new("client_id", clientId()));
                        
                        return new SendReceipt(messageId);
                    }
                }
                catch (Exception e)
                {
                    // Account failure count
                    _sendFailureTotal.Add(1, new("topic", message.Topic), new("client_id", clientId()));                    
                    Logger.Info(e, $"Failed to send message to {target}");
                    ex = e;
                }
            }

            if (null != ex)
            {
                Logger.Error(ex, $"Failed to send message after {message.MaxAttemptTimes} attempts");
                throw ex;
            }

            Logger.Error($"Failed to send message after {message.MaxAttemptTimes} attempts with unspecified reasons");
            throw new Exception("Send message failed");
        }