in csharp/rocketmq-client-csharp/ProcessQueue.cs [350:415]
private void AckMessage(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
{
var clientId = _consumer.GetClientId();
var consumerGroup = _consumer.GetConsumerGroup();
var messageId = messageView.MessageId;
var endpoints = messageView.MessageQueue.Broker.Endpoints;
var request = _consumer.WrapAckMessageRequest(messageView);
var task = _consumer.GetClientManager().AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
_consumer.GetClientConfig().RequestTimeout);
task.ContinueWith(responseTask =>
{
if (responseTask.IsFaulted)
{
Logger.LogError(responseTask.Exception, $"Exception raised while acknowledging message," +
$" would retry later, clientId={clientId}," +
$" consumerGroup={consumerGroup}," +
$" messageId={messageId}," +
$" mq={_mq}, endpoints={endpoints}");
AckMessageLater(messageView, attempt + 1, tcs);
}
else
{
var invocation = responseTask.Result;
var requestId = invocation.RequestId;
var status = invocation.Response.Status;
var statusCode = status.Code;
if (statusCode == Code.InvalidReceiptHandle)
{
Logger.LogError($"Failed to ack message due to the invalid receipt handle, forgive to retry," +
$" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," +
$" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," +
$" status message={status.Message}");
tcs.SetException(new BadRequestException((int)statusCode, requestId, status.Message));
}
if (statusCode != Code.Ok)
{
Logger.LogError(
$"Failed to change invisible duration, would retry later, clientId={clientId}," +
$" consumerGroup={consumerGroup}, messageId={messageId}, attempt={attempt}, mq={_mq}," +
$" endpoints={endpoints}, requestId={requestId}, status message={status.Message}");
AckMessageLater(messageView, attempt + 1, tcs);
return;
}
tcs.SetResult(true);
if (attempt > 1)
{
Logger.LogInformation($"Successfully acked message finally, clientId={clientId}," +
$" consumerGroup={consumerGroup}, messageId={messageId}," +
$" attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
$" requestId={requestId}");
}
else
{
Logger.LogDebug($"Successfully acked message, clientId={clientId}," +
$" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," +
$" endpoints={endpoints}, requestId={requestId}");
}
}
}, TaskContinuationOptions.ExecuteSynchronously);
}