in csharp/rocketmq-client-csharp/ProcessQueue.cs [448:513]
private void ChangeInvisibleDuration(MessageView messageView, TimeSpan duration, int attempt,
TaskCompletionSource<bool> tcs)
{
var clientId = _consumer.GetClientId();
var consumerGroup = _consumer.GetConsumerGroup();
var messageId = messageView.MessageId;
var endpoints = messageView.MessageQueue.Broker.Endpoints;
var request = _consumer.WrapChangeInvisibleDuration(messageView, duration);
var task = _consumer.GetClientManager().ChangeInvisibleDuration(endpoints,
request, _consumer.GetClientConfig().RequestTimeout);
task.ContinueWith(responseTask =>
{
if (responseTask.IsFaulted)
{
Logger.LogError(responseTask.Exception, $"Exception raised while changing invisible" +
$" duration, would retry later, clientId={clientId}," +
$" consumerGroup={consumerGroup}," +
$" messageId={messageId}, mq={_mq}," +
$" endpoints={endpoints}");
ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs);
}
else
{
var invocation = responseTask.Result;
var requestId = invocation.RequestId;
var status = invocation.Response.Status;
var statusCode = status.Code;
if (statusCode == Code.InvalidReceiptHandle)
{
Logger.LogError($"Failed to change invisible duration due to the invalid receipt handle," +
$" forgive to retry, clientId={clientId}, consumerGroup={consumerGroup}," +
$" messageId={messageId}, attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
$" requestId={requestId}, status message={status.Message}");
tcs.SetException(new BadRequestException((int)statusCode, requestId, status.Message));
}
if (statusCode != Code.Ok)
{
Logger.LogError($"Failed to change invisible duration, would retry later," +
$" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," +
$" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," +
$" status message={status.Message}");
ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs);
return;
}
tcs.SetResult(true);
if (attempt > 1)
{
Logger.LogInformation($"Finally, changed invisible duration successfully," +
$" clientId={clientId}, consumerGroup={consumerGroup}," +
$" messageId={messageId}, attempt={attempt}, mq={_mq}," +
$" endpoints={endpoints}, requestId={requestId}");
}
else
{
Logger.LogDebug($"Changed invisible duration successfully, clientId={clientId}," +
$" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," +
$" endpoints={endpoints}, requestId={requestId}");
}
}
});
}