in csharp/rocketmq-client-csharp/ProcessQueue.cs [589:651]
private void ForwardToDeadLetterQueue(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
{
var clientId = _consumer.GetClientId();
var consumerGroup = _consumer.GetConsumerGroup();
var messageId = messageView.MessageId;
var endpoints = messageView.MessageQueue.Broker.Endpoints;
var request = _consumer.WrapForwardMessageToDeadLetterQueueRequest(messageView);
var task = _consumer.GetClientManager().ForwardMessageToDeadLetterQueue(endpoints, request,
_consumer.GetClientConfig().RequestTimeout);
task.ContinueWith(responseTask =>
{
if (responseTask.IsFaulted)
{
// Log failure and retry later.
Logger.LogError($"Exception raised while forward message to DLQ, would attempt to re-forward later, " +
$"clientId={_consumer.GetClientId()}," +
$" consumerGroup={_consumer.GetConsumerGroup()}," +
$" messageId={messageView.MessageId}, mq={_mq}", responseTask.Exception);
ForwardToDeadLetterQueueLater(messageView, attempt, tcs);
}
else
{
var invocation = responseTask.Result;
var requestId = invocation.RequestId;
var status = invocation.Response.Status;
var statusCode = status.Code;
// Log failure and retry later.
if (statusCode != Code.Ok)
{
Logger.LogError($"Failed to forward message to dead letter queue," +
$" would attempt to re-forward later, clientId={clientId}," +
$" consumerGroup={consumerGroup}, messageId={messageId}," +
$" attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
$" requestId={requestId}, code={statusCode}," +
$" status message={status.Message}");
ForwardToDeadLetterQueueLater(messageView, attempt, tcs);
return;
}
tcs.SetResult(true);
// Log success.
if (attempt > 1)
{
Logger.LogInformation($"Re-forward message to dead letter queue successfully, " +
$"clientId={clientId}, consumerGroup={consumerGroup}," +
$" attempt={attempt}, messageId={messageId}, mq={_mq}," +
$" endpoints={endpoints}, requestId={requestId}");
}
else
{
Logger.LogInformation($"Forward message to dead letter queue successfully, " +
$"clientId={clientId}, consumerGroup={consumerGroup}," +
$" messageId={messageId}, mq={_mq}, endpoints={endpoints}," +
$" requestId={requestId}");
}
}
});
}