csharp/rocketmq-client-csharp/ProcessQueue.cs (617 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Apache.Rocketmq.V2; using Grpc.Core; using Microsoft.Extensions.Logging; using Org.Apache.Rocketmq.Error; namespace Org.Apache.Rocketmq { /// <summary> /// Process queue is a cache to store fetched messages from remote for <c>PushConsumer</c>. /// /// <c>PushConsumer</c> queries assignments periodically and converts them into message queues, each message queue is /// mapped into one process queue to fetch message from remote. If the message queue is removed from the newest /// assignment, the corresponding process queue is marked as expired soon, which means its lifecycle is over. /// </summary> public class ProcessQueue { private static readonly ILogger Logger = MqLogManager.CreateLogger<ProcessQueue>(); internal static readonly TimeSpan AckMessageFailureBackoffDelay = TimeSpan.FromSeconds(1); internal static readonly TimeSpan ChangeInvisibleDurationFailureBackoffDelay = TimeSpan.FromSeconds(1); internal static readonly TimeSpan ForwardMessageToDeadLetterQueueFailureBackoffDelay = TimeSpan.FromSeconds(1); private static readonly TimeSpan ReceivingFlowControlBackoffDelay = TimeSpan.FromMilliseconds(20); private static readonly TimeSpan ReceivingFailureBackoffDelay = TimeSpan.FromSeconds(1); private static readonly TimeSpan ReceivingBackoffDelayWhenCacheIsFull = TimeSpan.FromSeconds(1); private readonly PushConsumer _consumer; /// <summary> /// Dropped means ProcessQueue is deprecated, which means no message would be fetched from remote anymore. /// </summary> private volatile bool _dropped; private readonly MessageQueue _mq; private readonly FilterExpression _filterExpression; /// <summary> /// Messages which is pending means have been cached, but are not taken by consumer dispatcher yet. /// </summary> private readonly List<MessageView> _cachedMessages; private readonly ReaderWriterLockSlim _cachedMessageLock; private long _cachedMessagesBytes; private long _activityTime = DateTime.UtcNow.Ticks; private long _cacheFullTime = long.MinValue; private readonly CancellationTokenSource _receiveMsgCts; private readonly CancellationTokenSource _ackMsgCts; private readonly CancellationTokenSource _changeInvisibleDurationCts; private readonly CancellationTokenSource _forwardMessageToDeadLetterQueueCts; public ProcessQueue(PushConsumer consumer, MessageQueue mq, FilterExpression filterExpression, CancellationTokenSource receiveMsgCts, CancellationTokenSource ackMsgCts, CancellationTokenSource changeInvisibleDurationCts, CancellationTokenSource forwardMessageToDeadLetterQueueCts) { _consumer = consumer; _dropped = false; _mq = mq; _filterExpression = filterExpression; _cachedMessages = new List<MessageView>(); _cachedMessageLock = new ReaderWriterLockSlim(); _cachedMessagesBytes = 0; _receiveMsgCts = receiveMsgCts; _ackMsgCts = ackMsgCts; _changeInvisibleDurationCts = changeInvisibleDurationCts; _forwardMessageToDeadLetterQueueCts = forwardMessageToDeadLetterQueueCts; } /// <summary> /// Get the mapped message queue. /// </summary> /// <returns>mapped message queue.</returns> public MessageQueue GetMessageQueue() { return _mq; } /// <summary> /// Drop the current process queue, which means the process queue's lifecycle is over, /// thus it would not fetch messages from the remote anymore if dropped. /// </summary> public void Drop() { _dropped = true; } /// <summary> /// ProcessQueue would be regarded as expired if no fetch message for a long time. /// </summary> /// <returns>if it is expired.</returns> public bool Expired() { var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout(); var requestTimeout = _consumer.GetClientConfig().RequestTimeout; var maxIdleDuration = longPollingTimeout.Add(requestTimeout).Multiply(3); var idleDuration = DateTime.UtcNow.Ticks - Interlocked.Read(ref _activityTime); if (idleDuration < maxIdleDuration.Ticks) { return false; } var afterCacheFullDuration = DateTime.UtcNow.Ticks - Interlocked.Read(ref _cacheFullTime); if (afterCacheFullDuration < maxIdleDuration.Ticks) { return false; } Logger.LogWarning( $"Process queue is idle, idleDuration={idleDuration}, maxIdleDuration={maxIdleDuration}," + $" afterCacheFullDuration={afterCacheFullDuration}, mq={_mq}, clientId={_consumer.GetClientId()}"); return true; } internal void CacheMessages(List<MessageView> messageList) { _cachedMessageLock.EnterWriteLock(); try { foreach (var messageView in messageList) { _cachedMessages.Add(messageView); Interlocked.Add(ref _cachedMessagesBytes, messageView.Body.Length); } } finally { _cachedMessageLock.ExitWriteLock(); } } private int GetReceptionBatchSize() { var bufferSize = _consumer.CacheMessageCountThresholdPerQueue() - CachedMessagesCount(); bufferSize = Math.Max(bufferSize, 1); return Math.Min(bufferSize, _consumer.GetPushConsumerSettings().GetReceiveBatchSize()); } /// <summary> /// Start to fetch messages from remote immediately. /// </summary> public void FetchMessageImmediately() { ReceiveMessageImmediately(); } /// <summary> /// Receive message later by message queue. /// </summary> /// <remarks> /// Make sure that no exception will be thrown. /// </remarks> public void OnReceiveMessageException(Exception t, string attemptId) { var delay = t is TooManyRequestsException ? ReceivingFlowControlBackoffDelay : ReceivingFailureBackoffDelay; ReceiveMessageLater(delay, attemptId); } private void ReceiveMessageLater(TimeSpan delay, string attemptId) { var clientId = _consumer.GetClientId(); Logger.LogInformation($"Try to receive message later, mq={_mq}, delay={delay}, clientId={clientId}"); Task.Run(async () => { try { await Task.Delay(delay, _receiveMsgCts.Token); ReceiveMessage(attemptId); } catch (Exception ex) { if (_receiveMsgCts.IsCancellationRequested) { return; } Logger.LogError(ex, $"[Bug] Failed to schedule message receiving request, mq={_mq}, clientId={clientId}"); OnReceiveMessageException(ex, attemptId); } }); } private string GenerateAttemptId() { return Guid.NewGuid().ToString(); } public void ReceiveMessage() { ReceiveMessage(GenerateAttemptId()); } public void ReceiveMessage(string attemptId) { var clientId = _consumer.GetClientId(); if (_dropped) { Logger.LogInformation($"Process queue has been dropped, no longer receive message, mq={_mq}, clientId={clientId}"); return; } if (IsCacheFull()) { Logger.LogWarning($"Process queue cache is full, would receive message later, mq={_mq}, clientId={clientId}"); ReceiveMessageLater(ReceivingBackoffDelayWhenCacheIsFull, attemptId); return; } ReceiveMessageImmediately(attemptId); } private void ReceiveMessageImmediately() { ReceiveMessageImmediately(GenerateAttemptId()); } private void ReceiveMessageImmediately(string attemptId) { var clientId = _consumer.GetClientId(); if (_consumer.State != State.Running) { Logger.LogInformation($"Stop to receive message because consumer is not running, mq={_mq}, clientId={clientId}"); return; } try { var endpoints = _mq.Broker.Endpoints; var batchSize = GetReceptionBatchSize(); var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout(); var request = _consumer.WrapReceiveMessageRequest(batchSize, _mq, _filterExpression, longPollingTimeout, attemptId); Interlocked.Exchange(ref _activityTime, DateTime.UtcNow.Ticks); var task = _consumer.ReceiveMessage(request, _mq, longPollingTimeout); task.ContinueWith(t => { if (t.IsFaulted) { string nextAttemptId = null; if (t.Exception is { InnerException: RpcException { StatusCode: StatusCode.DeadlineExceeded } }) { nextAttemptId = request.AttemptId; } Logger.LogError(t.Exception, $"Exception raised during message reception, mq={_mq}," + $" attemptId={request.AttemptId}, nextAttemptId={nextAttemptId}," + $" clientId={clientId}"); OnReceiveMessageException(t.Exception, nextAttemptId); } else { try { var result = t.Result; OnReceiveMessageResult(result); } catch (Exception ex) { // Should never reach here. Logger.LogError($"[Bug] Exception raised while handling receive result, mq={_mq}," + $" endpoints={endpoints}, clientId={clientId}, exception={ex}"); OnReceiveMessageException(ex, attemptId); } } }, TaskContinuationOptions.ExecuteSynchronously); } catch (Exception ex) { Logger.LogError(ex, $"Exception raised during message reception, mq={_mq}, clientId={clientId}"); OnReceiveMessageException(ex, attemptId); } } private void OnReceiveMessageResult(ReceiveMessageResult result) { var messages = result.Messages; if (messages.Count > 0) { CacheMessages(messages); _consumer.GetConsumeService().Consume(this, messages); } ReceiveMessage(); } private bool IsCacheFull() { var cacheMessageCountThresholdPerQueue = _consumer.CacheMessageCountThresholdPerQueue(); var actualMessagesQuantity = CachedMessagesCount(); var clientId = _consumer.GetClientId(); if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) { Logger.LogWarning($"Process queue total cached messages quantity exceeds the threshold," + $" threshold={cacheMessageCountThresholdPerQueue}, actual={actualMessagesQuantity}," + $" mq={_mq}, clientId={clientId}"); Interlocked.Exchange(ref _cacheFullTime, DateTime.UtcNow.Ticks); return true; } var cacheMessageBytesThresholdPerQueue = _consumer.CacheMessageBytesThresholdPerQueue(); var actualCachedMessagesBytes = CachedMessageBytes(); if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) { Logger.LogWarning($"Process queue total cached messages memory exceeds the threshold," + $" threshold={cacheMessageBytesThresholdPerQueue} bytes," + $" actual={actualCachedMessagesBytes} bytes, mq={_mq}, clientId={clientId}"); Interlocked.Exchange(ref _cacheFullTime, DateTime.UtcNow.Ticks); return true; } return false; } /// <summary> /// Erase messages(Non-FIFO-consume-mode) which have been consumed properly. /// </summary> /// <param name="messageView">the message to erase.</param> /// <param name="consumeResult">consume result.</param> public void EraseMessage(MessageView messageView, ConsumeResult consumeResult) { var task = ConsumeResult.SUCCESS.Equals(consumeResult) ? AckMessage(messageView) : NackMessage(messageView); _ = task.ContinueWith(_ => { EvictCache(messageView); }, TaskContinuationOptions.ExecuteSynchronously); } private Task AckMessage(MessageView messageView) { var tcs = new TaskCompletionSource<bool>(); AckMessage(messageView, 1, tcs); return tcs.Task; } private void AckMessage(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs) { var clientId = _consumer.GetClientId(); var consumerGroup = _consumer.GetConsumerGroup(); var messageId = messageView.MessageId; var endpoints = messageView.MessageQueue.Broker.Endpoints; var request = _consumer.WrapAckMessageRequest(messageView); var task = _consumer.GetClientManager().AckMessage(messageView.MessageQueue.Broker.Endpoints, request, _consumer.GetClientConfig().RequestTimeout); task.ContinueWith(responseTask => { if (responseTask.IsFaulted) { Logger.LogError(responseTask.Exception, $"Exception raised while acknowledging message," + $" would retry later, clientId={clientId}," + $" consumerGroup={consumerGroup}," + $" messageId={messageId}," + $" mq={_mq}, endpoints={endpoints}"); AckMessageLater(messageView, attempt + 1, tcs); } else { var invocation = responseTask.Result; var requestId = invocation.RequestId; var status = invocation.Response.Status; var statusCode = status.Code; if (statusCode == Code.InvalidReceiptHandle) { Logger.LogError($"Failed to ack message due to the invalid receipt handle, forgive to retry," + $" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," + $" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," + $" status message={status.Message}"); tcs.SetException(new BadRequestException((int)statusCode, requestId, status.Message)); } if (statusCode != Code.Ok) { Logger.LogError( $"Failed to change invisible duration, would retry later, clientId={clientId}," + $" consumerGroup={consumerGroup}, messageId={messageId}, attempt={attempt}, mq={_mq}," + $" endpoints={endpoints}, requestId={requestId}, status message={status.Message}"); AckMessageLater(messageView, attempt + 1, tcs); return; } tcs.SetResult(true); if (attempt > 1) { Logger.LogInformation($"Successfully acked message finally, clientId={clientId}," + $" consumerGroup={consumerGroup}, messageId={messageId}," + $" attempt={attempt}, mq={_mq}, endpoints={endpoints}," + $" requestId={requestId}"); } else { Logger.LogDebug($"Successfully acked message, clientId={clientId}," + $" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," + $" endpoints={endpoints}, requestId={requestId}"); } } }, TaskContinuationOptions.ExecuteSynchronously); } private void AckMessageLater(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs) { Task.Run(async () => { try { await Task.Delay(AckMessageFailureBackoffDelay, _ackMsgCts.Token); AckMessage(messageView, attempt + 1, tcs); } catch (Exception ex) { if (_ackMsgCts.IsCancellationRequested) { return; } Logger.LogError(ex, $"[Bug] Failed to schedule message ack request, mq={_mq}," + $" messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}"); AckMessageLater(messageView, attempt + 1, tcs); } }); } private Task NackMessage(MessageView messageView) { var deliveryAttempt = messageView.DeliveryAttempt; var duration = _consumer.GetRetryPolicy().GetNextAttemptDelay(deliveryAttempt); var tcs = new TaskCompletionSource<bool>(); ChangeInvisibleDuration(messageView, duration, 1, tcs); return tcs.Task; } private void ChangeInvisibleDuration(MessageView messageView, TimeSpan duration, int attempt, TaskCompletionSource<bool> tcs) { var clientId = _consumer.GetClientId(); var consumerGroup = _consumer.GetConsumerGroup(); var messageId = messageView.MessageId; var endpoints = messageView.MessageQueue.Broker.Endpoints; var request = _consumer.WrapChangeInvisibleDuration(messageView, duration); var task = _consumer.GetClientManager().ChangeInvisibleDuration(endpoints, request, _consumer.GetClientConfig().RequestTimeout); task.ContinueWith(responseTask => { if (responseTask.IsFaulted) { Logger.LogError(responseTask.Exception, $"Exception raised while changing invisible" + $" duration, would retry later, clientId={clientId}," + $" consumerGroup={consumerGroup}," + $" messageId={messageId}, mq={_mq}," + $" endpoints={endpoints}"); ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs); } else { var invocation = responseTask.Result; var requestId = invocation.RequestId; var status = invocation.Response.Status; var statusCode = status.Code; if (statusCode == Code.InvalidReceiptHandle) { Logger.LogError($"Failed to change invisible duration due to the invalid receipt handle," + $" forgive to retry, clientId={clientId}, consumerGroup={consumerGroup}," + $" messageId={messageId}, attempt={attempt}, mq={_mq}, endpoints={endpoints}," + $" requestId={requestId}, status message={status.Message}"); tcs.SetException(new BadRequestException((int)statusCode, requestId, status.Message)); } if (statusCode != Code.Ok) { Logger.LogError($"Failed to change invisible duration, would retry later," + $" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," + $" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," + $" status message={status.Message}"); ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs); return; } tcs.SetResult(true); if (attempt > 1) { Logger.LogInformation($"Finally, changed invisible duration successfully," + $" clientId={clientId}, consumerGroup={consumerGroup}," + $" messageId={messageId}, attempt={attempt}, mq={_mq}," + $" endpoints={endpoints}, requestId={requestId}"); } else { Logger.LogDebug($"Changed invisible duration successfully, clientId={clientId}," + $" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," + $" endpoints={endpoints}, requestId={requestId}"); } } }); } private void ChangeInvisibleDurationLater(MessageView messageView, TimeSpan duration, int attempt, TaskCompletionSource<bool> tcs) { Task.Run(async () => { try { await Task.Delay(ChangeInvisibleDurationFailureBackoffDelay, _changeInvisibleDurationCts.Token); ChangeInvisibleDuration(messageView, duration, attempt, tcs); } catch (Exception ex) { if (_changeInvisibleDurationCts.IsCancellationRequested) { return; } Logger.LogError(ex, $"[Bug] Failed to schedule message change invisible duration request," + $" mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}"); ChangeInvisibleDurationLater(messageView, duration, attempt + 1, tcs); } }); } public Task EraseFifoMessage(MessageView messageView, ConsumeResult consumeResult) { var retryPolicy = _consumer.GetRetryPolicy(); var maxAttempts = retryPolicy.GetMaxAttempts(); var attempt = messageView.DeliveryAttempt; var messageId = messageView.MessageId; var service = _consumer.GetConsumeService(); var clientId = _consumer.GetClientId(); if (consumeResult == ConsumeResult.FAILURE && attempt < maxAttempts) { var nextAttemptDelay = retryPolicy.GetNextAttemptDelay(attempt); attempt = messageView.IncrementAndGetDeliveryAttempt(); Logger.LogDebug($"Prepare to redeliver the fifo message because of the consumption failure," + $" maxAttempt={maxAttempts}, attempt={attempt}, mq={messageView.MessageQueue}," + $" messageId={messageId}, nextAttemptDelay={nextAttemptDelay}, clientId={clientId}"); var redeliverTask = service.Consume(messageView, nextAttemptDelay); _ = redeliverTask.ContinueWith(async t => { var result = await t; await EraseFifoMessage(messageView, result); }, TaskContinuationOptions.ExecuteSynchronously); } else { var success = consumeResult == ConsumeResult.SUCCESS; if (!success) { Logger.LogInformation($"Failed to consume fifo message finally, run out of attempt times," + $" maxAttempts={maxAttempts}, attempt={attempt}, mq={messageView.MessageQueue}," + $" messageId={messageId}, clientId={clientId}"); } var task = ConsumeResult.SUCCESS.Equals(consumeResult) ? AckMessage(messageView) : ForwardToDeadLetterQueue(messageView); _ = task.ContinueWith(_ => { EvictCache(messageView); }, TaskContinuationOptions.ExecuteSynchronously); } return Task.CompletedTask; } private Task ForwardToDeadLetterQueue(MessageView messageView) { var tcs = new TaskCompletionSource<bool>(); ForwardToDeadLetterQueue(messageView, 1, tcs); return tcs.Task; } private void ForwardToDeadLetterQueue(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs) { var clientId = _consumer.GetClientId(); var consumerGroup = _consumer.GetConsumerGroup(); var messageId = messageView.MessageId; var endpoints = messageView.MessageQueue.Broker.Endpoints; var request = _consumer.WrapForwardMessageToDeadLetterQueueRequest(messageView); var task = _consumer.GetClientManager().ForwardMessageToDeadLetterQueue(endpoints, request, _consumer.GetClientConfig().RequestTimeout); task.ContinueWith(responseTask => { if (responseTask.IsFaulted) { // Log failure and retry later. Logger.LogError($"Exception raised while forward message to DLQ, would attempt to re-forward later, " + $"clientId={_consumer.GetClientId()}," + $" consumerGroup={_consumer.GetConsumerGroup()}," + $" messageId={messageView.MessageId}, mq={_mq}", responseTask.Exception); ForwardToDeadLetterQueueLater(messageView, attempt, tcs); } else { var invocation = responseTask.Result; var requestId = invocation.RequestId; var status = invocation.Response.Status; var statusCode = status.Code; // Log failure and retry later. if (statusCode != Code.Ok) { Logger.LogError($"Failed to forward message to dead letter queue," + $" would attempt to re-forward later, clientId={clientId}," + $" consumerGroup={consumerGroup}, messageId={messageId}," + $" attempt={attempt}, mq={_mq}, endpoints={endpoints}," + $" requestId={requestId}, code={statusCode}," + $" status message={status.Message}"); ForwardToDeadLetterQueueLater(messageView, attempt, tcs); return; } tcs.SetResult(true); // Log success. if (attempt > 1) { Logger.LogInformation($"Re-forward message to dead letter queue successfully, " + $"clientId={clientId}, consumerGroup={consumerGroup}," + $" attempt={attempt}, messageId={messageId}, mq={_mq}," + $" endpoints={endpoints}, requestId={requestId}"); } else { Logger.LogInformation($"Forward message to dead letter queue successfully, " + $"clientId={clientId}, consumerGroup={consumerGroup}," + $" messageId={messageId}, mq={_mq}, endpoints={endpoints}," + $" requestId={requestId}"); } } }); } private void ForwardToDeadLetterQueueLater(MessageView messageView, int attempt, TaskCompletionSource<bool> tcs) { Task.Run(async () => { try { await Task.Delay(ForwardMessageToDeadLetterQueueFailureBackoffDelay, _forwardMessageToDeadLetterQueueCts.Token); ForwardToDeadLetterQueue(messageView, attempt, tcs); } catch (Exception ex) { // Should never reach here. Logger.LogError($"[Bug] Failed to schedule DLQ message request, " + $"mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}", ex); ForwardToDeadLetterQueueLater(messageView, attempt + 1, tcs); } }); } /// <summary> /// Discard the message(Non-FIFO-consume-mode) which could not be consumed properly. /// </summary> /// <param name="messageView">the message to discard.</param> public void DiscardMessage(MessageView messageView) { Logger.LogInformation($"Discard message, mq={_mq}, messageId={messageView.MessageId}," + $" clientId={_consumer.GetClientId()}"); var task = NackMessage(messageView); _ = task.ContinueWith(_ => { EvictCache(messageView); }, TaskContinuationOptions.ExecuteSynchronously); } /// <summary> /// Discard the message(FIFO-consume-mode) which could not consumed properly. /// </summary> /// <param name="messageView">the FIFO message to discard.</param> public void DiscardFifoMessage(MessageView messageView) { Logger.LogInformation($"Discard fifo message, mq={_mq}, messageId={messageView.MessageId}," + $" clientId={_consumer.GetClientId()}"); var task = ForwardToDeadLetterQueue(messageView); _ = task.ContinueWith(_ => { EvictCache(messageView); }, TaskContinuationOptions.ExecuteSynchronously); } private void EvictCache(MessageView messageView) { _cachedMessageLock.EnterWriteLock(); try { if (_cachedMessages.Remove(messageView)) { Interlocked.Add(ref _cachedMessagesBytes, -messageView.Body.Length); } } finally { _cachedMessageLock.ExitWriteLock(); } } public int CachedMessagesCount() { _cachedMessageLock.EnterReadLock(); try { return _cachedMessages.Count; } finally { _cachedMessageLock.ExitReadLock(); } } public long CachedMessageBytes() { return Interlocked.Read(ref _cachedMessagesBytes); } /// <summary> /// Get the count of cached messages. /// </summary> /// <returns>count of pending messages.</returns> public long GetCachedMessageCount() { _cachedMessageLock.EnterReadLock(); try { return _cachedMessages.Count; } finally { _cachedMessageLock.ExitReadLock(); } } /// <summary> /// Get the bytes of cached message memory footprint. /// </summary> /// <returns>bytes of cached message memory footprint.</returns> public long GetCachedMessageBytes() { return _cachedMessagesBytes; } } }