csharp/rocketmq-client-csharp/PushConsumer.cs (553 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Schedulers;
using Apache.Rocketmq.V2;
using Google.Protobuf.WellKnownTypes;
using Proto = Apache.Rocketmq.V2;
using Microsoft.Extensions.Logging;
[assembly: InternalsVisibleTo("tests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
namespace Org.Apache.Rocketmq
{
public class PushConsumer : Consumer, IAsyncDisposable, IDisposable
{
private static readonly ILogger Logger = MqLogManager.CreateLogger<PushConsumer>();
private static readonly TimeSpan AssignmentScanScheduleDelay = TimeSpan.FromSeconds(1);
private static readonly TimeSpan AssignmentScanSchedulePeriod = TimeSpan.FromSeconds(5);
private readonly ClientConfig _clientConfig;
private readonly PushSubscriptionSettings _pushSubscriptionSettings;
private readonly string _consumerGroup;
private readonly ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions;
private readonly ConcurrentDictionary<string, Assignments> _cacheAssignments;
private readonly IMessageListener _messageListener;
private readonly int _maxCacheMessageCount;
private readonly int _maxCacheMessageSizeInBytes;
private readonly ConcurrentDictionary<MessageQueue, ProcessQueue> _processQueueTable;
private ConsumeService _consumeService;
private readonly TaskScheduler _consumptionTaskScheduler;
private readonly CancellationTokenSource _consumptionCts;
private readonly CancellationTokenSource _scanAssignmentCts;
private readonly CancellationTokenSource _receiveMsgCts;
private readonly CancellationTokenSource _ackMsgCts;
private readonly CancellationTokenSource _changeInvisibleDurationCts;
private readonly CancellationTokenSource _forwardMsgToDeadLetterQueueCts;
/// <summary>
/// The caller is supposed to have validated the arguments and handled throwing exception or
/// logging warnings already, so we avoid repeating args check here.
/// </summary>
public PushConsumer(ClientConfig clientConfig, string consumerGroup,
ConcurrentDictionary<string, FilterExpression> subscriptionExpressions, IMessageListener messageListener,
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount)
: base(clientConfig, consumerGroup)
{
_clientConfig = clientConfig;
_consumerGroup = consumerGroup;
_subscriptionExpressions = subscriptionExpressions;
_pushSubscriptionSettings = new PushSubscriptionSettings(_clientConfig.Namespace, ClientId, Endpoints, consumerGroup,
clientConfig.RequestTimeout, subscriptionExpressions);
_cacheAssignments = new ConcurrentDictionary<string, Assignments>();
_messageListener = messageListener;
_maxCacheMessageCount = maxCacheMessageCount;
_maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
_scanAssignmentCts = new CancellationTokenSource();
_processQueueTable = new ConcurrentDictionary<MessageQueue, ProcessQueue>();
_consumptionTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(consumptionThreadCount);
_consumptionCts = new CancellationTokenSource();
_receiveMsgCts = new CancellationTokenSource();
_ackMsgCts = new CancellationTokenSource();
_changeInvisibleDurationCts = new CancellationTokenSource();
_forwardMsgToDeadLetterQueueCts = new CancellationTokenSource();
}
protected override async Task Start()
{
try
{
State = State.Starting;
Logger.LogInformation($"Begin to start the rocketmq push consumer, clientId={ClientId}");
await base.Start();
_consumeService = CreateConsumerService();
ScheduleWithFixedDelay(ScanAssignments, AssignmentScanScheduleDelay, AssignmentScanSchedulePeriod,
_scanAssignmentCts.Token);
Logger.LogInformation($"The rocketmq push consumer starts successfully, clientId={ClientId}");
State = State.Running;
}
catch (Exception)
{
State = State.Failed;
throw;
}
}
public async ValueTask DisposeAsync()
{
await Shutdown().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
public void Dispose()
{
Shutdown().Wait();
GC.SuppressFinalize(this);
}
protected override async Task Shutdown()
{
try
{
State = State.Stopping;
Logger.LogInformation($"Begin to shutdown the rocketmq push consumer, clientId={ClientId}");
_receiveMsgCts.Cancel();
_ackMsgCts.Cancel();
_changeInvisibleDurationCts.Cancel();
_forwardMsgToDeadLetterQueueCts.Cancel();
_scanAssignmentCts.Cancel();
await base.Shutdown();
_consumptionCts.Cancel();
Logger.LogInformation($"Shutdown the rocketmq push consumer successfully, clientId={ClientId}");
State = State.Terminated;
}
catch (Exception)
{
State = State.Failed;
throw;
}
}
private ConsumeService CreateConsumerService()
{
if (_pushSubscriptionSettings.IsFifo())
{
Logger.LogInformation(
$"Create FIFO consume service, consumerGroup={_consumerGroup}, clientId={ClientId}");
return new FifoConsumeService(ClientId, _messageListener, _consumptionTaskScheduler, _consumptionCts.Token);
}
Logger.LogInformation(
$"Create standard consume service, consumerGroup={_consumerGroup}, clientId={ClientId}");
return new StandardConsumeService(ClientId, _messageListener, _consumptionTaskScheduler, _consumptionCts.Token);
}
/// <summary>
/// Adds a subscription expression dynamically.
/// </summary>
/// <param name="filterExpression">The new filter expression to add.</param>
/// <returns>The push consumer instance.</returns>
public async Task Subscribe(string topic, FilterExpression filterExpression)
{
if (State.Running != State)
{
throw new InvalidOperationException("Push consumer is not running");
}
await GetRouteData(topic);
_subscriptionExpressions[topic] = filterExpression;
}
/// <summary>
/// Removes a subscription expression dynamically by topic.
/// </summary>
/// <remarks>
/// It stops the backend task to fetch messages from the server.
/// The locally cached messages whose topic was removed before would not be delivered
/// to the <see cref="IMessageListener"/> anymore.
///
/// Nothing occurs if the specified topic does not exist in subscription expressions
/// of the push consumer.
/// </remarks>
/// <param name="topic">The topic to remove the subscription.</param>
/// <returns>The push consumer instance.</returns>
public void Unsubscribe(string topic)
{
if (State.Running != State)
{
throw new InvalidOperationException("Push consumer is not running");
}
_subscriptionExpressions.TryRemove(topic, out _);
}
internal void ScanAssignments()
{
try
{
Logger.LogDebug($"Start to scan assignments periodically, clientId={ClientId}");
foreach (var (topic, filterExpression) in _subscriptionExpressions)
{
var existed = _cacheAssignments.GetValueOrDefault(topic);
var queryAssignmentTask = QueryAssignment(topic);
queryAssignmentTask.ContinueWith(task =>
{
if (task.IsFaulted)
{
Logger.LogError(task.Exception, "Exception raised while scanning the assignments," +
$" topic={topic}, clientId={ClientId}");
return;
}
var latest = task.Result;
if (latest.GetAssignmentList().Count == 0)
{
if (existed == null || existed.GetAssignmentList().Count == 0)
{
Logger.LogInformation("Acquired empty assignments from remote, would scan later," +
$" topic={topic}, clientId={ClientId}");
return;
}
Logger.LogInformation("Attention!!! acquired empty assignments from remote, but" +
$" existed assignments are not empty, topic={topic}," +
$" clientId={ClientId}");
}
if (!latest.Equals(existed))
{
Logger.LogInformation($"Assignments of topic={topic} has changed, {existed} =>" +
$" {latest}, clientId={ClientId}");
SyncProcessQueue(topic, latest, filterExpression);
_cacheAssignments[topic] = latest;
return;
}
Logger.LogDebug($"Assignments of topic={topic} remain the same," +
$" assignments={existed}, clientId={ClientId}");
// Process queue may be dropped, need to be synchronized anyway.
SyncProcessQueue(topic, latest, filterExpression);
}, TaskContinuationOptions.ExecuteSynchronously);
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"Exception raised while scanning the assignments for all topics, clientId={ClientId}");
}
}
private void SyncProcessQueue(string topic, Assignments assignments, FilterExpression filterExpression)
{
var latest = new HashSet<MessageQueue>();
var assignmentList = assignments.GetAssignmentList();
foreach (var assignment in assignmentList)
{
latest.Add(assignment.MessageQueue);
}
var activeMqs = new HashSet<MessageQueue>();
foreach (var (mq, pq) in _processQueueTable)
{
if (!topic.Equals(mq.Topic))
{
continue;
}
if (!latest.Contains(mq))
{
Logger.LogInformation($"Drop message queue according to the latest assignmentList," +
$" mq={mq}, clientId={ClientId}");
DropProcessQueue(mq);
continue;
}
if (pq.Expired())
{
Logger.LogWarning($"Drop message queue because it is expired," +
$" mq={mq}, clientId={ClientId}");
DropProcessQueue(mq);
continue;
}
activeMqs.Add(mq);
}
foreach (var mq in latest)
{
if (activeMqs.Contains(mq))
{
continue;
}
var processQueue = CreateProcessQueue(mq, filterExpression);
if (processQueue != null)
{
Logger.LogInformation($"Start to fetch message from remote, mq={mq}, clientId={ClientId}");
processQueue.FetchMessageImmediately();
}
}
}
internal Task<Assignments> QueryAssignment(string topic)
{
var pickEndpointsTask = PickEndpointsToQueryAssignments(topic);
return pickEndpointsTask.ContinueWith(task0 =>
{
if (task0 is { IsFaulted: true, Exception: { } })
{
throw task0.Exception;
}
var endpoints = task0.Result;
var request = WrapQueryAssignmentRequest(topic);
var requestTimeout = _clientConfig.RequestTimeout;
var queryAssignmentTask = ClientManager.QueryAssignment(endpoints, request, requestTimeout);
return queryAssignmentTask.ContinueWith(task1 =>
{
if (task1 is { IsFaulted: true, Exception: { } })
{
throw task1.Exception;
}
var response = task1.Result.Response;
var status = response.Status;
StatusChecker.Check(status, request, task1.Result.RequestId);
var assignmentList = response.Assignments
.Select(assignment => new Assignment(new MessageQueue(assignment.MessageQueue)))
.ToList();
return Task.FromResult(new Assignments(assignmentList));
}, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}
private Task<Endpoints> PickEndpointsToQueryAssignments(string topic)
{
var getRouteDataTask = GetRouteData(topic);
return getRouteDataTask.ContinueWith(task =>
{
if (task is { IsFaulted: true, Exception: { } })
{
throw task.Exception;
}
var topicRouteData = task.Result;
return topicRouteData.PickEndpointsToQueryAssignments();
}, TaskContinuationOptions.ExecuteSynchronously);
}
private QueryAssignmentRequest WrapQueryAssignmentRequest(string topic)
{
var topicResource = new Proto.Resource
{
ResourceNamespace = _clientConfig.Namespace,
Name = topic
};
return new QueryAssignmentRequest
{
Topic = topicResource,
Group = GetProtobufGroup(),
Endpoints = Endpoints.ToProtobuf()
};
}
/// <summary>
/// Drops the <see cref="ProcessQueue"/> by <see cref="MessageQueue"/>.
/// <see cref="ProcessQueue"/> must be removed before it is dropped.
/// </summary>
/// <param name="mq">The message queue.</param>
internal void DropProcessQueue(MessageQueue mq)
{
if (_processQueueTable.TryRemove(mq, out var pq))
{
pq.Drop();
}
}
/// <summary>
/// Creates a process queue and adds it into the <see cref="_processQueueTable"/>.
/// Returns <see cref="ProcessQueue"/> if the mapped process queue already exists.
/// </summary>
/// <remarks>
/// This function and <see cref="DropProcessQueue"/> ensure that a process queue is not dropped if
/// it is contained in <see cref="_processQueueTable"/>. Once a process queue is dropped, it must have been
/// removed from <see cref="_processQueueTable"/>.
/// </remarks>
/// <param name="mq">The message queue.</param>
/// <param name="filterExpression">The filter expression of the topic.</param>
/// <returns>A process queue.</returns>
protected ProcessQueue CreateProcessQueue(MessageQueue mq, FilterExpression filterExpression)
{
var processQueue = new ProcessQueue(this, mq, filterExpression, _receiveMsgCts, _ackMsgCts,
_changeInvisibleDurationCts, _forwardMsgToDeadLetterQueueCts);
if (_processQueueTable.TryGetValue(mq, out var previous))
{
return null;
}
_processQueueTable.TryAdd(mq, processQueue);
return processQueue;
}
public async Task AckMessage(MessageView messageView)
{
if (State.Running != State)
{
throw new InvalidOperationException("Push consumer is not running");
}
var request = WrapAckMessageRequest(messageView);
var invocation = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
ClientConfig.RequestTimeout);
StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
}
protected override IEnumerable<string> GetTopics()
{
return _subscriptionExpressions.Keys;
}
internal override Proto.HeartbeatRequest WrapHeartbeatRequest()
{
return new Proto::HeartbeatRequest
{
ClientType = Proto.ClientType.PushConsumer,
Group = GetProtobufGroup()
};
}
protected internal ChangeInvisibleDurationRequest WrapChangeInvisibleDuration(MessageView messageView,
TimeSpan invisibleDuration)
{
var topicResource = new Proto.Resource
{
ResourceNamespace = _clientConfig.Namespace,
Name = messageView.Topic
};
return new Proto.ChangeInvisibleDurationRequest
{
Topic = topicResource,
Group = GetProtobufGroup(),
ReceiptHandle = messageView.ReceiptHandle,
InvisibleDuration = Duration.FromTimeSpan(invisibleDuration),
MessageId = messageView.MessageId
};
}
protected internal AckMessageRequest WrapAckMessageRequest(MessageView messageView)
{
var topicResource = new Proto.Resource
{
ResourceNamespace = _clientConfig.Namespace,
Name = messageView.Topic
};
var entry = new Proto.AckMessageEntry
{
MessageId = messageView.MessageId,
ReceiptHandle = messageView.ReceiptHandle,
};
return new Proto.AckMessageRequest
{
Group = GetProtobufGroup(),
Topic = topicResource,
Entries = { entry }
};
}
protected internal ForwardMessageToDeadLetterQueueRequest WrapForwardMessageToDeadLetterQueueRequest(MessageView messageView)
{
var topicResource = new Proto.Resource
{
ResourceNamespace = _clientConfig.Namespace,
Name = messageView.Topic
};
return new ForwardMessageToDeadLetterQueueRequest
{
Group = GetProtobufGroup(),
Topic = topicResource,
ReceiptHandle = messageView.ReceiptHandle,
MessageId = messageView.MessageId,
DeliveryAttempt = messageView.DeliveryAttempt,
MaxDeliveryAttempts = GetRetryPolicy().GetMaxAttempts()
};
}
protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData)
{
}
internal override async void OnVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command)
{
var nonce = command.Nonce;
var messageView = MessageView.FromProtobuf(command.Message);
var messageId = messageView.MessageId;
Proto.TelemetryCommand telemetryCommand = null;
try
{
var consumeResult = await _consumeService.Consume(messageView);
var code = consumeResult == ConsumeResult.SUCCESS ? Code.Ok : Code.FailedToConsumeMessage;
var status = new Status
{
Code = code
};
var verifyMessageResult = new VerifyMessageResult
{
Nonce = nonce
};
telemetryCommand = new TelemetryCommand
{
VerifyMessageResult = verifyMessageResult,
Status = status
};
var (_, session) = GetSession(endpoints);
await session.WriteAsync(telemetryCommand);
}
catch (Exception e)
{
Logger.LogError(e,
$"Failed to send message verification result command, endpoints={Endpoints}, command={telemetryCommand}, messageId={messageId}, clientId={ClientId}");
}
}
internal override NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
{
return new NotifyClientTerminationRequest()
{
Group = GetProtobufGroup()
};
}
internal int GetQueueSize()
{
return _processQueueTable.Count;
}
internal int CacheMessageBytesThresholdPerQueue()
{
var size = this.GetQueueSize();
// All process queues are removed, no need to cache messages.
return size <= 0 ? 0 : Math.Max(1, _maxCacheMessageSizeInBytes / size);
}
internal int CacheMessageCountThresholdPerQueue()
{
var size = this.GetQueueSize();
// All process queues are removed, no need to cache messages.
if (size <= 0)
{
return 0;
}
return Math.Max(1, _maxCacheMessageCount / size);
}
internal override Settings GetSettings()
{
return _pushSubscriptionSettings;
}
/// <summary>
/// Gets the load balancing group for the consumer.
/// </summary>
/// <returns>The consumer load balancing group.</returns>
public string GetConsumerGroup()
{
return _consumerGroup;
}
public PushSubscriptionSettings GetPushConsumerSettings()
{
return _pushSubscriptionSettings;
}
/// <summary>
/// Lists the existing subscription expressions in the push consumer.
/// </summary>
/// <returns>Collections of the subscription expressions.</returns>
public ConcurrentDictionary<string, FilterExpression> GetSubscriptionExpressions()
{
return _subscriptionExpressions;
}
public IRetryPolicy GetRetryPolicy()
{
return _pushSubscriptionSettings.GetRetryPolicy();
}
public ConsumeService GetConsumeService()
{
return _consumeService;
}
private Proto.Resource GetProtobufGroup()
{
return new Proto.Resource()
{
ResourceNamespace = _clientConfig.Namespace,
Name = ConsumerGroup
};
}
public class Builder
{
private ClientConfig _clientConfig;
private string _consumerGroup;
private ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions;
private IMessageListener _messageListener;
private int _maxCacheMessageCount = 1024;
private int _maxCacheMessageSizeInBytes = 64 * 1024 * 1024;
private int _consumptionThreadCount = 20;
public Builder SetClientConfig(ClientConfig clientConfig)
{
Preconditions.CheckArgument(null != clientConfig, "clientConfig should not be null");
_clientConfig = clientConfig;
return this;
}
public Builder SetConsumerGroup(string consumerGroup)
{
Preconditions.CheckArgument(null != consumerGroup, "consumerGroup should not be null");
Preconditions.CheckArgument(consumerGroup != null && ConsumerGroupRegex.Match(consumerGroup).Success,
$"topic does not match the regex {ConsumerGroupRegex}");
_consumerGroup = consumerGroup;
return this;
}
public Builder SetSubscriptionExpression(Dictionary<string, FilterExpression> subscriptionExpressions)
{
Preconditions.CheckArgument(null != subscriptionExpressions,
"subscriptionExpressions should not be null");
Preconditions.CheckArgument(subscriptionExpressions!.Count != 0,
"subscriptionExpressions should not be empty");
_subscriptionExpressions = new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions!);
return this;
}
public Builder SetMessageListener(IMessageListener messageListener)
{
Preconditions.CheckArgument(null != messageListener,
"messageListener should not be null");
_messageListener = messageListener;
return this;
}
public Builder SetMaxCacheMessageCount(int maxCacheMessageCount)
{
Preconditions.CheckArgument(maxCacheMessageCount > 0,
"maxCacheMessageCount should be positive");
_maxCacheMessageCount = maxCacheMessageCount;
return this;
}
public Builder SetMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes)
{
Preconditions.CheckArgument(maxCacheMessageSizeInBytes > 0,
"maxCacheMessageSizeInBytes should be positive");
_maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes;
return this;
}
public Builder SetConsumptionThreadCount(int consumptionThreadCount)
{
Preconditions.CheckArgument(consumptionThreadCount > 0,
"consumptionThreadCount should be positive");
_consumptionThreadCount = consumptionThreadCount;
return this;
}
public async Task<PushConsumer> Build()
{
Preconditions.CheckArgument(null != _clientConfig, "clientConfig has not been set yet");
Preconditions.CheckArgument(null != _consumerGroup, "consumerGroup has not been set yet");
Preconditions.CheckArgument(!_subscriptionExpressions!.IsEmpty,
"subscriptionExpressions has not been set yet");
Preconditions.CheckArgument(null != _messageListener, "messageListener has not been set yet");
var pushConsumer = new PushConsumer(_clientConfig, _consumerGroup, _subscriptionExpressions,
_messageListener, _maxCacheMessageCount,
_maxCacheMessageSizeInBytes, _consumptionThreadCount);
await pushConsumer.Start();
return pushConsumer;
}
}
}
}