in rocketmq-client-csharp/Producer.cs [89:200]
public async Task<SendReceipt> Send(Message message)
{
if (!_loadBalancer.ContainsKey(message.Topic))
{
var topicRouteData = await GetRouteFor(message.Topic, false);
if (null == topicRouteData || null == topicRouteData.MessageQueues || 0 == topicRouteData.MessageQueues.Count)
{
Logger.Error($"Failed to resolve route info for {message.Topic}");
throw new TopicRouteException(string.Format("No topic route for {0}", message.Topic));
}
var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
_loadBalancer.TryAdd(message.Topic, loadBalancerItem);
}
var publishLb = _loadBalancer[message.Topic];
var request = new rmq::SendMessageRequest();
var entry = new rmq::Message();
entry.Body = ByteString.CopyFrom(message.Body);
entry.Topic = new rmq::Resource();
entry.Topic.ResourceNamespace = resourceNamespace();
entry.Topic.Name = message.Topic;
request.Messages.Add(entry);
// User properties
foreach (var item in message.UserProperties)
{
entry.UserProperties.Add(item.Key, item.Value);
}
entry.SystemProperties = new rmq::SystemProperties();
entry.SystemProperties.MessageId = message.MessageId;
entry.SystemProperties.MessageType = rmq::MessageType.Normal;
if (DateTime.MinValue != message.DeliveryTimestamp)
{
entry.SystemProperties.MessageType = rmq::MessageType.Delay;
entry.SystemProperties.DeliveryTimestamp = Timestamp.FromDateTime(message.DeliveryTimestamp);
if (message.Fifo())
{
Logger.Warn("A message may not be FIFO and delayed at the same time");
throw new MessageException("A message may not be both FIFO and Timed");
}
} else if (!String.IsNullOrEmpty(message.MessageGroup))
{
entry.SystemProperties.MessageType = rmq::MessageType.Fifo;
entry.SystemProperties.MessageGroup = message.MessageGroup;
}
if (!string.IsNullOrEmpty(message.Tag))
{
entry.SystemProperties.Tag = message.Tag;
}
if (0 != message.Keys.Count)
{
foreach (var key in message.Keys)
{
entry.SystemProperties.Keys.Add(key);
}
}
List<string> targets = new List<string>();
List<rmq::MessageQueue> candidates = publishLb.Select(message.MaxAttemptTimes);
foreach (var messageQueue in candidates)
{
targets.Add(Utilities.TargetUrl(messageQueue));
}
var metadata = new Metadata();
Signature.sign(this, metadata);
Exception ex = null;
foreach (var target in targets)
{
try
{
var stopWatch = new Stopwatch();
stopWatch.Start();
rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
if (null != response && rmq::Code.Ok == response.Status.Code)
{
var messageId = response.Entries[0].MessageId;
// Account latency histogram
stopWatch.Stop();
var latency = stopWatch.ElapsedMilliseconds;
_sendLatency.Record(latency, new("topic", message.Topic), new("client_id", clientId()));
return new SendReceipt(messageId);
}
}
catch (Exception e)
{
// Account failure count
_sendFailureTotal.Add(1, new("topic", message.Topic), new("client_id", clientId()));
Logger.Info(e, $"Failed to send message to {target}");
ex = e;
}
}
if (null != ex)
{
Logger.Error(ex, $"Failed to send message after {message.MaxAttemptTimes} attempts");
throw ex;
}
Logger.Error($"Failed to send message after {message.MaxAttemptTimes} attempts with unspecified reasons");
throw new Exception("Send message failed");
}