in csharp/rocketmq-client-csharp/Producer.cs [203:291]
private async Task<SendReceipt> Send0(PublishingMessage message, List<MessageQueue> candidates)
{
var retryPolicy = GetRetryPolicy();
var maxAttempts = retryPolicy.GetMaxAttempts();
Exception exception = null;
for (var attempt = 1; attempt <= maxAttempts; attempt++)
{
var stopwatch = Stopwatch.StartNew();
var candidateIndex = (attempt - 1) % candidates.Count;
var mq = candidates[candidateIndex];
if (PublishingSettings.IsValidateMessageType() && !mq.AcceptMessageTypes.Contains(message.MessageType))
{
throw new ArgumentException(
"Current message type does not match with the accept message types," +
$" topic={message.Topic}, actualMessageType={message.MessageType}" +
$" acceptMessageType={string.Join(",", mq.AcceptMessageTypes)}");
}
var sendMessageRequest = WrapSendMessageRequest(message, mq);
var endpoints = mq.Broker.Endpoints;
try
{
var invocation =
await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation);
var sendReceipt = sendReceipts.First();
if (attempt > 1)
{
Logger.Info(
$"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
$" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
}
return sendReceipt;
}
catch (Exception e)
{
exception = e;
// Isolate current endpoints.
Isolated[endpoints] = true;
if (attempt >= maxAttempts)
{
Logger.Error(e, "Failed to send message finally, run out of attempt times, " +
$"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
$"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
throw;
}
if (MessageType.Transaction == message.MessageType)
{
Logger.Error(e, "Failed to send transaction message, run out of attempt times, " +
$"topic={message.Topic}, maxAttempt=1, attempt={attempt}, " +
$"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
throw;
}
if (!(exception is TooManyRequestsException))
{
// Retry immediately if the request is not throttled.
Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " +
$"attempt={attempt}, endpoints={endpoints}, messageId={message.MessageId}," +
$" clientId={ClientId}");
continue;
}
var nextAttempt = 1 + attempt;
var delay = retryPolicy.GetNextAttemptDelay(nextAttempt);
await Task.Delay(delay);
Logger.Warn(e, "Failed to send message due to too many request, would attempt to resend " +
$"after {delay}, topic={message.Topic}, maxAttempts={maxAttempts}, attempt={attempt}, " +
$"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
}
finally
{
var elapsed = stopwatch.Elapsed.Milliseconds;
_sendCostTimeHistogram.Record(elapsed,
new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic),
new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId),
new KeyValuePair<string, object>(MetricConstant.InvocationStatus,
null == exception ? MetricConstant.Success : MetricConstant.Failure));
}
}
throw new Exception($"Failed to send message finally, topic={message.Topic}, clientId={ClientId}",
exception);
}