csharp/rocketmq-client-csharp/ProcessQueue.cs (617 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Apache.Rocketmq.V2;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Org.Apache.Rocketmq.Error;
namespace Org.Apache.Rocketmq
{
/// <summary>
/// Process queue is a cache to store fetched messages from remote for <c>PushConsumer</c>.
///
/// <c>PushConsumer</c> queries assignments periodically and converts them into message queues, each message queue is
/// mapped into one process queue to fetch message from remote. If the message queue is removed from the newest
/// assignment, the corresponding process queue is marked as expired soon, which means its lifecycle is over.
/// </summary>
public class ProcessQueue
{
private static readonly ILogger Logger = MqLogManager.CreateLogger<ProcessQueue>();
internal static readonly TimeSpan AckMessageFailureBackoffDelay = TimeSpan.FromSeconds(1);
internal static readonly TimeSpan ChangeInvisibleDurationFailureBackoffDelay = TimeSpan.FromSeconds(1);
internal static readonly TimeSpan ForwardMessageToDeadLetterQueueFailureBackoffDelay = TimeSpan.FromSeconds(1);
private static readonly TimeSpan ReceivingFlowControlBackoffDelay = TimeSpan.FromMilliseconds(20);
private static readonly TimeSpan ReceivingFailureBackoffDelay = TimeSpan.FromSeconds(1);
private static readonly TimeSpan ReceivingBackoffDelayWhenCacheIsFull = TimeSpan.FromSeconds(1);
private readonly PushConsumer _consumer;
/// <summary>
/// Dropped means ProcessQueue is deprecated, which means no message would be fetched from remote anymore.
/// </summary>
private volatile bool _dropped;
private readonly MessageQueue _mq;
private readonly FilterExpression _filterExpression;
/// <summary>
/// Messages which is pending means have been cached, but are not taken by consumer dispatcher yet.
/// </summary>
private readonly List<MessageView> _cachedMessages;
private readonly ReaderWriterLockSlim _cachedMessageLock;
private long _cachedMessagesBytes;
private long _activityTime = DateTime.UtcNow.Ticks;
private long _cacheFullTime = long.MinValue;
private readonly CancellationTokenSource _receiveMsgCts;
private readonly CancellationTokenSource _ackMsgCts;
private readonly CancellationTokenSource _changeInvisibleDurationCts;
private readonly CancellationTokenSource _forwardMessageToDeadLetterQueueCts;
public ProcessQueue(PushConsumer consumer, MessageQueue mq, FilterExpression filterExpression,
CancellationTokenSource receiveMsgCts, CancellationTokenSource ackMsgCts,
CancellationTokenSource changeInvisibleDurationCts, CancellationTokenSource forwardMessageToDeadLetterQueueCts)
{
_consumer = consumer;
_dropped = false;
_mq = mq;
_filterExpression = filterExpression;
_cachedMessages = new List<MessageView>();
_cachedMessageLock = new ReaderWriterLockSlim();
_cachedMessagesBytes = 0;
_receiveMsgCts = receiveMsgCts;
_ackMsgCts = ackMsgCts;
_changeInvisibleDurationCts = changeInvisibleDurationCts;
_forwardMessageToDeadLetterQueueCts = forwardMessageToDeadLetterQueueCts;
}
/// <summary>
/// Get the mapped message queue.
/// </summary>
/// <returns>mapped message queue.</returns>
public MessageQueue GetMessageQueue()
{
return _mq;
}
/// <summary>
/// Drop the current process queue, which means the process queue's lifecycle is over,
/// thus it would not fetch messages from the remote anymore if dropped.
/// </summary>
public void Drop()
{
_dropped = true;
}
/// <summary>
/// ProcessQueue would be regarded as expired if no fetch message for a long time.
/// </summary>
/// <returns>if it is expired.</returns>
public bool Expired()
{
var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout();
var requestTimeout = _consumer.GetClientConfig().RequestTimeout;
var maxIdleDuration = longPollingTimeout.Add(requestTimeout).Multiply(3);
var idleDuration = DateTime.UtcNow.Ticks - Interlocked.Read(ref _activityTime);
if (idleDuration < maxIdleDuration.Ticks)
{
return false;
}
var afterCacheFullDuration = DateTime.UtcNow.Ticks - Interlocked.Read(ref _cacheFullTime);
if (afterCacheFullDuration < maxIdleDuration.Ticks)
{
return false;
}
Logger.LogWarning(
$"Process queue is idle, idleDuration={idleDuration}, maxIdleDuration={maxIdleDuration}," +
$" afterCacheFullDuration={afterCacheFullDuration}, mq={_mq}, clientId={_consumer.GetClientId()}");
return true;
}
internal void CacheMessages(List<MessageView> messageList)
{
_cachedMessageLock.EnterWriteLock();
try
{
foreach (var messageView in messageList)
{
_cachedMessages.Add(messageView);
Interlocked.Add(ref _cachedMessagesBytes, messageView.Body.Length);
}
}
finally
{
_cachedMessageLock.ExitWriteLock();
}
}
private int GetReceptionBatchSize()
{
var bufferSize = _consumer.CacheMessageCountThresholdPerQueue() - CachedMessagesCount();
bufferSize = Math.Max(bufferSize, 1);
return Math.Min(bufferSize, _consumer.GetPushConsumerSettings().GetReceiveBatchSize());
}
/// <summary>
/// Start to fetch messages from remote immediately.
/// </summary>
public void FetchMessageImmediately()
{
ReceiveMessageImmediately();
}
/// <summary>
/// Receive message later by message queue.
/// </summary>
/// <remarks>
/// Make sure that no exception will be thrown.
/// </remarks>
public void OnReceiveMessageException(Exception t, string attemptId)
{
var delay = t is TooManyRequestsException ? ReceivingFlowControlBackoffDelay : ReceivingFailureBackoffDelay;
ReceiveMessageLater(delay, attemptId);
}
private void ReceiveMessageLater(TimeSpan delay, string attemptId)
{
var clientId = _consumer.GetClientId();
Logger.LogInformation($"Try to receive message later, mq={_mq}, delay={delay}, clientId={clientId}");
Task.Run(async () =>
{
try
{
await Task.Delay(delay, _receiveMsgCts.Token);
ReceiveMessage(attemptId);
}
catch (Exception ex)
{
if (_receiveMsgCts.IsCancellationRequested)
{
return;
}
Logger.LogError(ex, $"[Bug] Failed to schedule message receiving request, mq={_mq}, clientId={clientId}");
OnReceiveMessageException(ex, attemptId);
}
});
}
private string GenerateAttemptId()
{
return Guid.NewGuid().ToString();
}
public void ReceiveMessage()
{
ReceiveMessage(GenerateAttemptId());
}
public void ReceiveMessage(string attemptId)
{
var clientId = _consumer.GetClientId();
if (_dropped)
{
Logger.LogInformation($"Process queue has been dropped, no longer receive message, mq={_mq}, clientId={clientId}");
return;
}
if (IsCacheFull())
{
Logger.LogWarning($"Process queue cache is full, would receive message later, mq={_mq}, clientId={clientId}");
ReceiveMessageLater(ReceivingBackoffDelayWhenCacheIsFull, attemptId);
return;
}
ReceiveMessageImmediately(attemptId);
}
private void ReceiveMessageImmediately()
{
ReceiveMessageImmediately(GenerateAttemptId());
}
private void ReceiveMessageImmediately(string attemptId)
{
var clientId = _consumer.GetClientId();
if (_consumer.State != State.Running)
{
Logger.LogInformation($"Stop to receive message because consumer is not running, mq={_mq}, clientId={clientId}");
return;
}
try
{
var endpoints = _mq.Broker.Endpoints;
var batchSize = GetReceptionBatchSize();
var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout();
var request = _consumer.WrapReceiveMessageRequest(batchSize, _mq, _filterExpression, longPollingTimeout,
attemptId);
Interlocked.Exchange(ref _activityTime, DateTime.UtcNow.Ticks);
var task = _consumer.ReceiveMessage(request, _mq, longPollingTimeout);
task.ContinueWith(t =>
{
if (t.IsFaulted)
{
string nextAttemptId = null;
if (t.Exception is { InnerException: RpcException { StatusCode: StatusCode.DeadlineExceeded } })
{
nextAttemptId = request.AttemptId;
}
Logger.LogError(t.Exception, $"Exception raised during message reception, mq={_mq}," +
$" attemptId={request.AttemptId}, nextAttemptId={nextAttemptId}," +
$" clientId={clientId}");
OnReceiveMessageException(t.Exception, nextAttemptId);
}
else
{
try
{
var result = t.Result;
OnReceiveMessageResult(result);
}
catch (Exception ex)
{
// Should never reach here.
Logger.LogError($"[Bug] Exception raised while handling receive result, mq={_mq}," +
$" endpoints={endpoints}, clientId={clientId}, exception={ex}");
OnReceiveMessageException(ex, attemptId);
}
}
}, TaskContinuationOptions.ExecuteSynchronously);
}
catch (Exception ex)
{
Logger.LogError(ex, $"Exception raised during message reception, mq={_mq}, clientId={clientId}");
OnReceiveMessageException(ex, attemptId);
}
}
private void OnReceiveMessageResult(ReceiveMessageResult result)
{
var messages = result.Messages;
if (messages.Count > 0)
{
CacheMessages(messages);
_consumer.GetConsumeService().Consume(this, messages);
}
ReceiveMessage();
}
private bool IsCacheFull()
{
var cacheMessageCountThresholdPerQueue = _consumer.CacheMessageCountThresholdPerQueue();
var actualMessagesQuantity = CachedMessagesCount();
var clientId = _consumer.GetClientId();
if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity)
{
Logger.LogWarning($"Process queue total cached messages quantity exceeds the threshold," +
$" threshold={cacheMessageCountThresholdPerQueue}, actual={actualMessagesQuantity}," +
$" mq={_mq}, clientId={clientId}");
Interlocked.Exchange(ref _cacheFullTime, DateTime.UtcNow.Ticks);
return true;
}
var cacheMessageBytesThresholdPerQueue = _consumer.CacheMessageBytesThresholdPerQueue();
var actualCachedMessagesBytes = CachedMessageBytes();
if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes)
{
Logger.LogWarning($"Process queue total cached messages memory exceeds the threshold," +
$" threshold={cacheMessageBytesThresholdPerQueue} bytes," +
$" actual={actualCachedMessagesBytes} bytes, mq={_mq}, clientId={clientId}");
Interlocked.Exchange(ref _cacheFullTime, DateTime.UtcNow.Ticks);
return true;
}
return false;
}
/// <summary>
/// Erase messages(Non-FIFO-consume-mode) which have been consumed properly.
/// </summary>
/// <param name="messageView">the message to erase.</param>
/// <param name="consumeResult">consume result.</param>
public void EraseMessage(MessageView messageView, ConsumeResult consumeResult)
{
var task = ConsumeResult.SUCCESS.Equals(consumeResult) ? AckMessage(messageView) : NackMessage(messageView);
_ = task.ContinueWith(_ =>
{
EvictCache(messageView);
}, TaskContinuationOptions.ExecuteSynchronously);
}
private Task AckMessage(MessageView messageView)
{
var tcs = new TaskCompletionSource<bool>();
AckMessage(messageView, 1, tcs);
return tcs.Task;
}
private void AckMessage(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
{
var clientId = _consumer.GetClientId();
var consumerGroup = _consumer.GetConsumerGroup();
var messageId = messageView.MessageId;
var endpoints = messageView.MessageQueue.Broker.Endpoints;
var request = _consumer.WrapAckMessageRequest(messageView);
var task = _consumer.GetClientManager().AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
_consumer.GetClientConfig().RequestTimeout);
task.ContinueWith(responseTask =>
{
if (responseTask.IsFaulted)
{
Logger.LogError(responseTask.Exception, $"Exception raised while acknowledging message," +
$" would retry later, clientId={clientId}," +
$" consumerGroup={consumerGroup}," +
$" messageId={messageId}," +
$" mq={_mq}, endpoints={endpoints}");
AckMessageLater(messageView, attempt + 1, tcs);
}
else
{
var invocation = responseTask.Result;
var requestId = invocation.RequestId;
var status = invocation.Response.Status;
var statusCode = status.Code;
if (statusCode == Code.InvalidReceiptHandle)
{
Logger.LogError($"Failed to ack message due to the invalid receipt handle, forgive to retry," +
$" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," +
$" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," +
$" status message={status.Message}");
tcs.SetException(new BadRequestException((int)statusCode, requestId, status.Message));
}
if (statusCode != Code.Ok)
{
Logger.LogError(
$"Failed to change invisible duration, would retry later, clientId={clientId}," +
$" consumerGroup={consumerGroup}, messageId={messageId}, attempt={attempt}, mq={_mq}," +
$" endpoints={endpoints}, requestId={requestId}, status message={status.Message}");
AckMessageLater(messageView, attempt + 1, tcs);
return;
}
tcs.SetResult(true);
if (attempt > 1)
{
Logger.LogInformation($"Successfully acked message finally, clientId={clientId}," +
$" consumerGroup={consumerGroup}, messageId={messageId}," +
$" attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
$" requestId={requestId}");
}
else
{
Logger.LogDebug($"Successfully acked message, clientId={clientId}," +
$" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," +
$" endpoints={endpoints}, requestId={requestId}");
}
}
}, TaskContinuationOptions.ExecuteSynchronously);
}
private void AckMessageLater(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
{
Task.Run(async () =>
{
try
{
await Task.Delay(AckMessageFailureBackoffDelay, _ackMsgCts.Token);
AckMessage(messageView, attempt + 1, tcs);
}
catch (Exception ex)
{
if (_ackMsgCts.IsCancellationRequested)
{
return;
}
Logger.LogError(ex, $"[Bug] Failed to schedule message ack request, mq={_mq}," +
$" messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}");
AckMessageLater(messageView, attempt + 1, tcs);
}
});
}
private Task NackMessage(MessageView messageView)
{
var deliveryAttempt = messageView.DeliveryAttempt;
var duration = _consumer.GetRetryPolicy().GetNextAttemptDelay(deliveryAttempt);
var tcs = new TaskCompletionSource<bool>();
ChangeInvisibleDuration(messageView, duration, 1, tcs);
return tcs.Task;
}
private void ChangeInvisibleDuration(MessageView messageView, TimeSpan duration, int attempt,
TaskCompletionSource<bool> tcs)
{
var clientId = _consumer.GetClientId();
var consumerGroup = _consumer.GetConsumerGroup();
var messageId = messageView.MessageId;
var endpoints = messageView.MessageQueue.Broker.Endpoints;
var request = _consumer.WrapChangeInvisibleDuration(messageView, duration);
var task = _consumer.GetClientManager().ChangeInvisibleDuration(endpoints,
request, _consumer.GetClientConfig().RequestTimeout);
task.ContinueWith(responseTask =>
{
if (responseTask.IsFaulted)
{
Logger.LogError(responseTask.Exception, $"Exception raised while changing invisible" +
$" duration, would retry later, clientId={clientId}," +
$" consumerGroup={consumerGroup}," +
$" messageId={messageId}, mq={_mq}," +
$" endpoints={endpoints}");
ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs);
}
else
{
var invocation = responseTask.Result;
var requestId = invocation.RequestId;
var status = invocation.Response.Status;
var statusCode = status.Code;
if (statusCode == Code.InvalidReceiptHandle)
{
Logger.LogError($"Failed to change invisible duration due to the invalid receipt handle," +
$" forgive to retry, clientId={clientId}, consumerGroup={consumerGroup}," +
$" messageId={messageId}, attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
$" requestId={requestId}, status message={status.Message}");
tcs.SetException(new BadRequestException((int)statusCode, requestId, status.Message));
}
if (statusCode != Code.Ok)
{
Logger.LogError($"Failed to change invisible duration, would retry later," +
$" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," +
$" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," +
$" status message={status.Message}");
ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs);
return;
}
tcs.SetResult(true);
if (attempt > 1)
{
Logger.LogInformation($"Finally, changed invisible duration successfully," +
$" clientId={clientId}, consumerGroup={consumerGroup}," +
$" messageId={messageId}, attempt={attempt}, mq={_mq}," +
$" endpoints={endpoints}, requestId={requestId}");
}
else
{
Logger.LogDebug($"Changed invisible duration successfully, clientId={clientId}," +
$" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," +
$" endpoints={endpoints}, requestId={requestId}");
}
}
});
}
private void ChangeInvisibleDurationLater(MessageView messageView, TimeSpan duration, int attempt,
TaskCompletionSource<bool> tcs)
{
Task.Run(async () =>
{
try
{
await Task.Delay(ChangeInvisibleDurationFailureBackoffDelay, _changeInvisibleDurationCts.Token);
ChangeInvisibleDuration(messageView, duration, attempt, tcs);
}
catch (Exception ex)
{
if (_changeInvisibleDurationCts.IsCancellationRequested)
{
return;
}
Logger.LogError(ex, $"[Bug] Failed to schedule message change invisible duration request," +
$" mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}");
ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs);
}
});
}
public Task EraseFifoMessage(MessageView messageView, ConsumeResult consumeResult)
{
var retryPolicy = _consumer.GetRetryPolicy();
var maxAttempts = retryPolicy.GetMaxAttempts();
var attempt = messageView.DeliveryAttempt;
var messageId = messageView.MessageId;
var service = _consumer.GetConsumeService();
var clientId = _consumer.GetClientId();
if (consumeResult == ConsumeResult.FAILURE && attempt < maxAttempts)
{
var nextAttemptDelay = retryPolicy.GetNextAttemptDelay(attempt);
attempt = messageView.IncrementAndGetDeliveryAttempt();
Logger.LogDebug($"Prepare to redeliver the fifo message because of the consumption failure," +
$" maxAttempt={maxAttempts}, attempt={attempt}, mq={messageView.MessageQueue}," +
$" messageId={messageId}, nextAttemptDelay={nextAttemptDelay}, clientId={clientId}");
var redeliverTask = service.Consume(messageView, nextAttemptDelay);
_ = redeliverTask.ContinueWith(async t =>
{
var result = await t;
await EraseFifoMessage(messageView, result);
}, TaskContinuationOptions.ExecuteSynchronously);
}
else
{
var success = consumeResult == ConsumeResult.SUCCESS;
if (!success)
{
Logger.LogInformation($"Failed to consume fifo message finally, run out of attempt times," +
$" maxAttempts={maxAttempts}, attempt={attempt}, mq={messageView.MessageQueue}," +
$" messageId={messageId}, clientId={clientId}");
}
var task = ConsumeResult.SUCCESS.Equals(consumeResult)
? AckMessage(messageView)
: ForwardToDeadLetterQueue(messageView);
_ = task.ContinueWith(_ => { EvictCache(messageView); },
TaskContinuationOptions.ExecuteSynchronously);
}
return Task.CompletedTask;
}
private Task ForwardToDeadLetterQueue(MessageView messageView)
{
var tcs = new TaskCompletionSource<bool>();
ForwardToDeadLetterQueue(messageView, 1, tcs);
return tcs.Task;
}
private void ForwardToDeadLetterQueue(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
{
var clientId = _consumer.GetClientId();
var consumerGroup = _consumer.GetConsumerGroup();
var messageId = messageView.MessageId;
var endpoints = messageView.MessageQueue.Broker.Endpoints;
var request = _consumer.WrapForwardMessageToDeadLetterQueueRequest(messageView);
var task = _consumer.GetClientManager().ForwardMessageToDeadLetterQueue(endpoints, request,
_consumer.GetClientConfig().RequestTimeout);
task.ContinueWith(responseTask =>
{
if (responseTask.IsFaulted)
{
// Log failure and retry later.
Logger.LogError($"Exception raised while forward message to DLQ, would attempt to re-forward later, " +
$"clientId={_consumer.GetClientId()}," +
$" consumerGroup={_consumer.GetConsumerGroup()}," +
$" messageId={messageView.MessageId}, mq={_mq}", responseTask.Exception);
ForwardToDeadLetterQueueLater(messageView, attempt, tcs);
}
else
{
var invocation = responseTask.Result;
var requestId = invocation.RequestId;
var status = invocation.Response.Status;
var statusCode = status.Code;
// Log failure and retry later.
if (statusCode != Code.Ok)
{
Logger.LogError($"Failed to forward message to dead letter queue," +
$" would attempt to re-forward later, clientId={clientId}," +
$" consumerGroup={consumerGroup}, messageId={messageId}," +
$" attempt={attempt}, mq={_mq}, endpoints={endpoints}," +
$" requestId={requestId}, code={statusCode}," +
$" status message={status.Message}");
ForwardToDeadLetterQueueLater(messageView, attempt, tcs);
return;
}
tcs.SetResult(true);
// Log success.
if (attempt > 1)
{
Logger.LogInformation($"Re-forward message to dead letter queue successfully, " +
$"clientId={clientId}, consumerGroup={consumerGroup}," +
$" attempt={attempt}, messageId={messageId}, mq={_mq}," +
$" endpoints={endpoints}, requestId={requestId}");
}
else
{
Logger.LogInformation($"Forward message to dead letter queue successfully, " +
$"clientId={clientId}, consumerGroup={consumerGroup}," +
$" messageId={messageId}, mq={_mq}, endpoints={endpoints}," +
$" requestId={requestId}");
}
}
});
}
private void ForwardToDeadLetterQueueLater(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs)
{
Task.Run(async () =>
{
try
{
await Task.Delay(ForwardMessageToDeadLetterQueueFailureBackoffDelay,
_forwardMessageToDeadLetterQueueCts.Token);
ForwardToDeadLetterQueue(messageView, attempt, tcs);
}
catch (Exception ex)
{
// Should never reach here.
Logger.LogError($"[Bug] Failed to schedule DLQ message request, " +
$"mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}", ex);
ForwardToDeadLetterQueueLater(messageView, attempt + 1, tcs);
}
});
}
/// <summary>
/// Discard the message(Non-FIFO-consume-mode) which could not be consumed properly.
/// </summary>
/// <param name="messageView">the message to discard.</param>
public void DiscardMessage(MessageView messageView)
{
Logger.LogInformation($"Discard message, mq={_mq}, messageId={messageView.MessageId}," +
$" clientId={_consumer.GetClientId()}");
var task = NackMessage(messageView);
_ = task.ContinueWith(_ =>
{
EvictCache(messageView);
}, TaskContinuationOptions.ExecuteSynchronously);
}
/// <summary>
/// Discard the message(FIFO-consume-mode) which could not consumed properly.
/// </summary>
/// <param name="messageView">the FIFO message to discard.</param>
public void DiscardFifoMessage(MessageView messageView)
{
Logger.LogInformation($"Discard fifo message, mq={_mq}, messageId={messageView.MessageId}," +
$" clientId={_consumer.GetClientId()}");
var task = ForwardToDeadLetterQueue(messageView);
_ = task.ContinueWith(_ =>
{
EvictCache(messageView);
}, TaskContinuationOptions.ExecuteSynchronously);
}
private void EvictCache(MessageView messageView)
{
_cachedMessageLock.EnterWriteLock();
try
{
if (_cachedMessages.Remove(messageView))
{
Interlocked.Add(ref _cachedMessagesBytes, -messageView.Body.Length);
}
}
finally
{
_cachedMessageLock.ExitWriteLock();
}
}
public int CachedMessagesCount()
{
_cachedMessageLock.EnterReadLock();
try
{
return _cachedMessages.Count;
}
finally
{
_cachedMessageLock.ExitReadLock();
}
}
public long CachedMessageBytes()
{
return Interlocked.Read(ref _cachedMessagesBytes);
}
/// <summary>
/// Get the count of cached messages.
/// </summary>
/// <returns>count of pending messages.</returns>
public long GetCachedMessageCount()
{
_cachedMessageLock.EnterReadLock();
try
{
return _cachedMessages.Count;
}
finally
{
_cachedMessageLock.ExitReadLock();
}
}
/// <summary>
/// Get the bytes of cached message memory footprint.
/// </summary>
/// <returns>bytes of cached message memory footprint.</returns>
public long GetCachedMessageBytes()
{
return _cachedMessagesBytes;
}
}
}