csharp/rocketmq-client-csharp/PushConsumer.cs (553 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Schedulers; using Apache.Rocketmq.V2; using Google.Protobuf.WellKnownTypes; using Proto = Apache.Rocketmq.V2; using Microsoft.Extensions.Logging; [assembly: InternalsVisibleTo("tests")] [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] namespace Org.Apache.Rocketmq { public class PushConsumer : Consumer, IAsyncDisposable, IDisposable { private static readonly ILogger Logger = MqLogManager.CreateLogger<PushConsumer>(); private static readonly TimeSpan AssignmentScanScheduleDelay = TimeSpan.FromSeconds(1); private static readonly TimeSpan AssignmentScanSchedulePeriod = TimeSpan.FromSeconds(5); private readonly ClientConfig _clientConfig; private readonly PushSubscriptionSettings _pushSubscriptionSettings; private readonly string _consumerGroup; private readonly ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions; private readonly ConcurrentDictionary<string, Assignments> _cacheAssignments; private readonly IMessageListener _messageListener; private readonly int _maxCacheMessageCount; private readonly int _maxCacheMessageSizeInBytes; private readonly ConcurrentDictionary<MessageQueue, ProcessQueue> _processQueueTable; private ConsumeService _consumeService; private readonly TaskScheduler _consumptionTaskScheduler; private readonly CancellationTokenSource _consumptionCts; private readonly CancellationTokenSource _scanAssignmentCts; private readonly CancellationTokenSource _receiveMsgCts; private readonly CancellationTokenSource _ackMsgCts; private readonly CancellationTokenSource _changeInvisibleDurationCts; private readonly CancellationTokenSource _forwardMsgToDeadLetterQueueCts; /// <summary> /// The caller is supposed to have validated the arguments and handled throwing exception or /// logging warnings already, so we avoid repeating args check here. /// </summary> public PushConsumer(ClientConfig clientConfig, string consumerGroup, ConcurrentDictionary<string, FilterExpression> subscriptionExpressions, IMessageListener messageListener, int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) : base(clientConfig, consumerGroup) { _clientConfig = clientConfig; _consumerGroup = consumerGroup; _subscriptionExpressions = subscriptionExpressions; _pushSubscriptionSettings = new PushSubscriptionSettings(_clientConfig.Namespace, ClientId, Endpoints, consumerGroup, clientConfig.RequestTimeout, subscriptionExpressions); _cacheAssignments = new ConcurrentDictionary<string, Assignments>(); _messageListener = messageListener; _maxCacheMessageCount = maxCacheMessageCount; _maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; _scanAssignmentCts = new CancellationTokenSource(); _processQueueTable = new ConcurrentDictionary<MessageQueue, ProcessQueue>(); _consumptionTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(consumptionThreadCount); _consumptionCts = new CancellationTokenSource(); _receiveMsgCts = new CancellationTokenSource(); _ackMsgCts = new CancellationTokenSource(); _changeInvisibleDurationCts = new CancellationTokenSource(); _forwardMsgToDeadLetterQueueCts = new CancellationTokenSource(); } protected override async Task Start() { try { State = State.Starting; Logger.LogInformation($"Begin to start the rocketmq push consumer, clientId={ClientId}"); await base.Start(); _consumeService = CreateConsumerService(); ScheduleWithFixedDelay(ScanAssignments, AssignmentScanScheduleDelay, AssignmentScanSchedulePeriod, _scanAssignmentCts.Token); Logger.LogInformation($"The rocketmq push consumer starts successfully, clientId={ClientId}"); State = State.Running; } catch (Exception) { State = State.Failed; throw; } } public async ValueTask DisposeAsync() { await Shutdown().ConfigureAwait(false); GC.SuppressFinalize(this); } public void Dispose() { Shutdown().Wait(); GC.SuppressFinalize(this); } protected override async Task Shutdown() { try { State = State.Stopping; Logger.LogInformation($"Begin to shutdown the rocketmq push consumer, clientId={ClientId}"); _receiveMsgCts.Cancel(); _ackMsgCts.Cancel(); _changeInvisibleDurationCts.Cancel(); _forwardMsgToDeadLetterQueueCts.Cancel(); _scanAssignmentCts.Cancel(); await base.Shutdown(); _consumptionCts.Cancel(); Logger.LogInformation($"Shutdown the rocketmq push consumer successfully, clientId={ClientId}"); State = State.Terminated; } catch (Exception) { State = State.Failed; throw; } } private ConsumeService CreateConsumerService() { if (_pushSubscriptionSettings.IsFifo()) { Logger.LogInformation( $"Create FIFO consume service, consumerGroup={_consumerGroup}, clientId={ClientId}"); return new FifoConsumeService(ClientId, _messageListener, _consumptionTaskScheduler, _consumptionCts.Token); } Logger.LogInformation( $"Create standard consume service, consumerGroup={_consumerGroup}, clientId={ClientId}"); return new StandardConsumeService(ClientId, _messageListener, _consumptionTaskScheduler, _consumptionCts.Token); } /// <summary> /// Adds a subscription expression dynamically. /// </summary> /// <param name="filterExpression">The new filter expression to add.</param> /// <returns>The push consumer instance.</returns> public async Task Subscribe(string topic, FilterExpression filterExpression) { if (State.Running != State) { throw new InvalidOperationException("Push consumer is not running"); } await GetRouteData(topic); _subscriptionExpressions[topic] = filterExpression; } /// <summary> /// Removes a subscription expression dynamically by topic. /// </summary> /// <remarks> /// It stops the backend task to fetch messages from the server. /// The locally cached messages whose topic was removed before would not be delivered /// to the <see cref="IMessageListener"/> anymore. /// /// Nothing occurs if the specified topic does not exist in subscription expressions /// of the push consumer. /// </remarks> /// <param name="topic">The topic to remove the subscription.</param> /// <returns>The push consumer instance.</returns> public void Unsubscribe(string topic) { if (State.Running != State) { throw new InvalidOperationException("Push consumer is not running"); } _subscriptionExpressions.TryRemove(topic, out _); } internal void ScanAssignments() { try { Logger.LogDebug($"Start to scan assignments periodically, clientId={ClientId}"); foreach (var (topic, filterExpression) in _subscriptionExpressions) { var existed = _cacheAssignments.GetValueOrDefault(topic); var queryAssignmentTask = QueryAssignment(topic); queryAssignmentTask.ContinueWith(task => { if (task.IsFaulted) { Logger.LogError(task.Exception, "Exception raised while scanning the assignments," + $" topic={topic}, clientId={ClientId}"); return; } var latest = task.Result; if (latest.GetAssignmentList().Count == 0) { if (existed == null || existed.GetAssignmentList().Count == 0) { Logger.LogInformation("Acquired empty assignments from remote, would scan later," + $" topic={topic}, clientId={ClientId}"); return; } Logger.LogInformation("Attention!!! acquired empty assignments from remote, but" + $" existed assignments are not empty, topic={topic}," + $" clientId={ClientId}"); } if (!latest.Equals(existed)) { Logger.LogInformation($"Assignments of topic={topic} has changed, {existed} =>" + $" {latest}, clientId={ClientId}"); SyncProcessQueue(topic, latest, filterExpression); _cacheAssignments[topic] = latest; return; } Logger.LogDebug($"Assignments of topic={topic} remain the same," + $" assignments={existed}, clientId={ClientId}"); // Process queue may be dropped, need to be synchronized anyway. SyncProcessQueue(topic, latest, filterExpression); }, TaskContinuationOptions.ExecuteSynchronously); } } catch (Exception ex) { Logger.LogError(ex, $"Exception raised while scanning the assignments for all topics, clientId={ClientId}"); } } private void SyncProcessQueue(string topic, Assignments assignments, FilterExpression filterExpression) { var latest = new HashSet<MessageQueue>(); var assignmentList = assignments.GetAssignmentList(); foreach (var assignment in assignmentList) { latest.Add(assignment.MessageQueue); } var activeMqs = new HashSet<MessageQueue>(); foreach (var (mq, pq) in _processQueueTable) { if (!topic.Equals(mq.Topic)) { continue; } if (!latest.Contains(mq)) { Logger.LogInformation($"Drop message queue according to the latest assignmentList," + $" mq={mq}, clientId={ClientId}"); DropProcessQueue(mq); continue; } if (pq.Expired()) { Logger.LogWarning($"Drop message queue because it is expired," + $" mq={mq}, clientId={ClientId}"); DropProcessQueue(mq); continue; } activeMqs.Add(mq); } foreach (var mq in latest) { if (activeMqs.Contains(mq)) { continue; } var processQueue = CreateProcessQueue(mq, filterExpression); if (processQueue != null) { Logger.LogInformation($"Start to fetch message from remote, mq={mq}, clientId={ClientId}"); processQueue.FetchMessageImmediately(); } } } internal Task<Assignments> QueryAssignment(string topic) { var pickEndpointsTask = PickEndpointsToQueryAssignments(topic); return pickEndpointsTask.ContinueWith(task0 => { if (task0 is { IsFaulted: true, Exception: { } }) { throw task0.Exception; } var endpoints = task0.Result; var request = WrapQueryAssignmentRequest(topic); var requestTimeout = _clientConfig.RequestTimeout; var queryAssignmentTask = ClientManager.QueryAssignment(endpoints, request, requestTimeout); return queryAssignmentTask.ContinueWith(task1 => { if (task1 is { IsFaulted: true, Exception: { } }) { throw task1.Exception; } var response = task1.Result.Response; var status = response.Status; StatusChecker.Check(status, request, task1.Result.RequestId); var assignmentList = response.Assignments .Select(assignment => new Assignment(new MessageQueue(assignment.MessageQueue))) .ToList(); return Task.FromResult(new Assignments(assignmentList)); }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); } private Task<Endpoints> PickEndpointsToQueryAssignments(string topic) { var getRouteDataTask = GetRouteData(topic); return getRouteDataTask.ContinueWith(task => { if (task is { IsFaulted: true, Exception: { } }) { throw task.Exception; } var topicRouteData = task.Result; return topicRouteData.PickEndpointsToQueryAssignments(); }, TaskContinuationOptions.ExecuteSynchronously); } private QueryAssignmentRequest WrapQueryAssignmentRequest(string topic) { var topicResource = new Proto.Resource { ResourceNamespace = _clientConfig.Namespace, Name = topic }; return new QueryAssignmentRequest { Topic = topicResource, Group = GetProtobufGroup(), Endpoints = Endpoints.ToProtobuf() }; } /// <summary> /// Drops the <see cref="ProcessQueue"/> by <see cref="MessageQueue"/>. /// <see cref="ProcessQueue"/> must be removed before it is dropped. /// </summary> /// <param name="mq">The message queue.</param> internal void DropProcessQueue(MessageQueue mq) { if (_processQueueTable.TryRemove(mq, out var pq)) { pq.Drop(); } } /// <summary> /// Creates a process queue and adds it into the <see cref="_processQueueTable"/>. /// Returns <see cref="ProcessQueue"/> if the mapped process queue already exists. /// </summary> /// <remarks> /// This function and <see cref="DropProcessQueue"/> ensure that a process queue is not dropped if /// it is contained in <see cref="_processQueueTable"/>. Once a process queue is dropped, it must have been /// removed from <see cref="_processQueueTable"/>. /// </remarks> /// <param name="mq">The message queue.</param> /// <param name="filterExpression">The filter expression of the topic.</param> /// <returns>A process queue.</returns> protected ProcessQueue CreateProcessQueue(MessageQueue mq, FilterExpression filterExpression) { var processQueue = new ProcessQueue(this, mq, filterExpression, _receiveMsgCts, _ackMsgCts, _changeInvisibleDurationCts, _forwardMsgToDeadLetterQueueCts); if (_processQueueTable.TryGetValue(mq, out var previous)) { return null; } _processQueueTable.TryAdd(mq, processQueue); return processQueue; } public async Task AckMessage(MessageView messageView) { if (State.Running != State) { throw new InvalidOperationException("Push consumer is not running"); } var request = WrapAckMessageRequest(messageView); var invocation = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request, ClientConfig.RequestTimeout); StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); } protected override IEnumerable<string> GetTopics() { return _subscriptionExpressions.Keys; } internal override Proto.HeartbeatRequest WrapHeartbeatRequest() { return new Proto::HeartbeatRequest { ClientType = Proto.ClientType.PushConsumer, Group = GetProtobufGroup() }; } protected internal ChangeInvisibleDurationRequest WrapChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration) { var topicResource = new Proto.Resource { ResourceNamespace = _clientConfig.Namespace, Name = messageView.Topic }; return new Proto.ChangeInvisibleDurationRequest { Topic = topicResource, Group = GetProtobufGroup(), ReceiptHandle = messageView.ReceiptHandle, InvisibleDuration = Duration.FromTimeSpan(invisibleDuration), MessageId = messageView.MessageId }; } protected internal AckMessageRequest WrapAckMessageRequest(MessageView messageView) { var topicResource = new Proto.Resource { ResourceNamespace = _clientConfig.Namespace, Name = messageView.Topic }; var entry = new Proto.AckMessageEntry { MessageId = messageView.MessageId, ReceiptHandle = messageView.ReceiptHandle, }; return new Proto.AckMessageRequest { Group = GetProtobufGroup(), Topic = topicResource, Entries = { entry } }; } protected internal ForwardMessageToDeadLetterQueueRequest WrapForwardMessageToDeadLetterQueueRequest(MessageView messageView) { var topicResource = new Proto.Resource { ResourceNamespace = _clientConfig.Namespace, Name = messageView.Topic }; return new ForwardMessageToDeadLetterQueueRequest { Group = GetProtobufGroup(), Topic = topicResource, ReceiptHandle = messageView.ReceiptHandle, MessageId = messageView.MessageId, DeliveryAttempt = messageView.DeliveryAttempt, MaxDeliveryAttempts = GetRetryPolicy().GetMaxAttempts() }; } protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData) { } internal override async void OnVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command) { var nonce = command.Nonce; var messageView = MessageView.FromProtobuf(command.Message); var messageId = messageView.MessageId; Proto.TelemetryCommand telemetryCommand = null; try { var consumeResult = await _consumeService.Consume(messageView); var code = consumeResult == ConsumeResult.SUCCESS ? Code.Ok : Code.FailedToConsumeMessage; var status = new Status { Code = code }; var verifyMessageResult = new VerifyMessageResult { Nonce = nonce }; telemetryCommand = new TelemetryCommand { VerifyMessageResult = verifyMessageResult, Status = status }; var (_, session) = GetSession(endpoints); await session.WriteAsync(telemetryCommand); } catch (Exception e) { Logger.LogError(e, $"Failed to send message verification result command, endpoints={Endpoints}, command={telemetryCommand}, messageId={messageId}, clientId={ClientId}"); } } internal override NotifyClientTerminationRequest WrapNotifyClientTerminationRequest() { return new NotifyClientTerminationRequest() { Group = GetProtobufGroup() }; } internal int GetQueueSize() { return _processQueueTable.Count; } internal int CacheMessageBytesThresholdPerQueue() { var size = this.GetQueueSize(); // All process queues are removed, no need to cache messages. return size <= 0 ? 0 : Math.Max(1, _maxCacheMessageSizeInBytes / size); } internal int CacheMessageCountThresholdPerQueue() { var size = this.GetQueueSize(); // All process queues are removed, no need to cache messages. if (size <= 0) { return 0; } return Math.Max(1, _maxCacheMessageCount / size); } internal override Settings GetSettings() { return _pushSubscriptionSettings; } /// <summary> /// Gets the load balancing group for the consumer. /// </summary> /// <returns>The consumer load balancing group.</returns> public string GetConsumerGroup() { return _consumerGroup; } public PushSubscriptionSettings GetPushConsumerSettings() { return _pushSubscriptionSettings; } /// <summary> /// Lists the existing subscription expressions in the push consumer. /// </summary> /// <returns>Collections of the subscription expressions.</returns> public ConcurrentDictionary<string, FilterExpression> GetSubscriptionExpressions() { return _subscriptionExpressions; } public IRetryPolicy GetRetryPolicy() { return _pushSubscriptionSettings.GetRetryPolicy(); } public ConsumeService GetConsumeService() { return _consumeService; } private Proto.Resource GetProtobufGroup() { return new Proto.Resource() { ResourceNamespace = _clientConfig.Namespace, Name = ConsumerGroup }; } public class Builder { private ClientConfig _clientConfig; private string _consumerGroup; private ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions; private IMessageListener _messageListener; private int _maxCacheMessageCount = 1024; private int _maxCacheMessageSizeInBytes = 64 * 1024 * 1024; private int _consumptionThreadCount = 20; public Builder SetClientConfig(ClientConfig clientConfig) { Preconditions.CheckArgument(null != clientConfig, "clientConfig should not be null"); _clientConfig = clientConfig; return this; } public Builder SetConsumerGroup(string consumerGroup) { Preconditions.CheckArgument(null != consumerGroup, "consumerGroup should not be null"); Preconditions.CheckArgument(consumerGroup != null && ConsumerGroupRegex.Match(consumerGroup).Success, $"topic does not match the regex {ConsumerGroupRegex}"); _consumerGroup = consumerGroup; return this; } public Builder SetSubscriptionExpression(Dictionary<string, FilterExpression> subscriptionExpressions) { Preconditions.CheckArgument(null != subscriptionExpressions, "subscriptionExpressions should not be null"); Preconditions.CheckArgument(subscriptionExpressions!.Count != 0, "subscriptionExpressions should not be empty"); _subscriptionExpressions = new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions!); return this; } public Builder SetMessageListener(IMessageListener messageListener) { Preconditions.CheckArgument(null != messageListener, "messageListener should not be null"); _messageListener = messageListener; return this; } public Builder SetMaxCacheMessageCount(int maxCacheMessageCount) { Preconditions.CheckArgument(maxCacheMessageCount > 0, "maxCacheMessageCount should be positive"); _maxCacheMessageCount = maxCacheMessageCount; return this; } public Builder SetMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes) { Preconditions.CheckArgument(maxCacheMessageSizeInBytes > 0, "maxCacheMessageSizeInBytes should be positive"); _maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; return this; } public Builder SetConsumptionThreadCount(int consumptionThreadCount) { Preconditions.CheckArgument(consumptionThreadCount > 0, "consumptionThreadCount should be positive"); _consumptionThreadCount = consumptionThreadCount; return this; } public async Task<PushConsumer> Build() { Preconditions.CheckArgument(null != _clientConfig, "clientConfig has not been set yet"); Preconditions.CheckArgument(null != _consumerGroup, "consumerGroup has not been set yet"); Preconditions.CheckArgument(!_subscriptionExpressions!.IsEmpty, "subscriptionExpressions has not been set yet"); Preconditions.CheckArgument(null != _messageListener, "messageListener has not been set yet"); var pushConsumer = new PushConsumer(_clientConfig, _consumerGroup, _subscriptionExpressions, _messageListener, _maxCacheMessageCount, _maxCacheMessageSizeInBytes, _consumptionThreadCount); await pushConsumer.Start(); return pushConsumer; } } } }