csharp/rocketmq-client-csharp/SimpleConsumer.cs (284 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging; using Proto = Apache.Rocketmq.V2; using Org.Apache.Rocketmq.Error; namespace Org.Apache.Rocketmq { public class SimpleConsumer : Consumer, IAsyncDisposable, IDisposable { private static readonly ILogger Logger = MqLogManager.CreateLogger<SimpleConsumer>(); private readonly ConcurrentDictionary<string /* topic */, SubscriptionLoadBalancer> _subscriptionRouteDataCache; private readonly ConcurrentDictionary<string /* topic */, FilterExpression> _subscriptionExpressions; private readonly TimeSpan _awaitDuration; private readonly SimpleSubscriptionSettings _simpleSubscriptionSettings; private int _topicRoundRobinIndex; private readonly ClientConfig _clientConfig; public SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration, Dictionary<string, FilterExpression> subscriptionExpressions) : this(clientConfig, consumerGroup, awaitDuration, new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions)) { } private SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration, ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(clientConfig, consumerGroup) { _awaitDuration = awaitDuration; _subscriptionRouteDataCache = new ConcurrentDictionary<string, SubscriptionLoadBalancer>(); _subscriptionExpressions = subscriptionExpressions; _simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientConfig.Namespace, ClientId, Endpoints, ConsumerGroup, clientConfig.RequestTimeout, awaitDuration, subscriptionExpressions); _topicRoundRobinIndex = 0; _clientConfig = clientConfig; } public async Task Subscribe(string topic, FilterExpression filterExpression) { if (State.Running != State) { throw new InvalidOperationException("Simple consumer is not running"); } await GetSubscriptionLoadBalancer(topic); _subscriptionExpressions[topic] = filterExpression; } public void Unsubscribe(string topic) { if (State.Running != State) { throw new InvalidOperationException("Simple consumer is not running"); } _subscriptionExpressions.TryRemove(topic, out _); } protected override async Task Start() { try { State = State.Starting; Logger.LogInformation($"Begin to start the rocketmq simple consumer, clientId={ClientId}"); await base.Start(); Logger.LogInformation($"The rocketmq simple consumer starts successfully, clientId={ClientId}"); State = State.Running; } catch (Exception) { State = State.Failed; throw; } } public async ValueTask DisposeAsync() { await Shutdown().ConfigureAwait(false); GC.SuppressFinalize(this); } public void Dispose() { Shutdown().Wait(); GC.SuppressFinalize(this); } protected override async Task Shutdown() { try { State = State.Stopping; Logger.LogInformation($"Begin to shutdown the rocketmq simple consumer, clientId={ClientId}"); await base.Shutdown(); Logger.LogInformation($"Shutdown the rocketmq simple consumer successfully, clientId={ClientId}"); State = State.Terminated; } catch (Exception) { State = State.Failed; throw; } } protected override IEnumerable<string> GetTopics() { return _subscriptionExpressions.Keys; } internal override Proto.NotifyClientTerminationRequest WrapNotifyClientTerminationRequest() { return new Proto.NotifyClientTerminationRequest() { Group = GetProtobufGroup() }; } internal override Proto.HeartbeatRequest WrapHeartbeatRequest() { return new Proto::HeartbeatRequest { ClientType = Proto.ClientType.SimpleConsumer, Group = GetProtobufGroup() }; } private SubscriptionLoadBalancer UpdateSubscriptionLoadBalancer(string topic, TopicRouteData topicRouteData) { if (_subscriptionRouteDataCache.TryGetValue(topic, out var subscriptionLoadBalancer)) { subscriptionLoadBalancer = subscriptionLoadBalancer.Update(topicRouteData); } else { subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteData); } _subscriptionRouteDataCache[topic] = subscriptionLoadBalancer; return subscriptionLoadBalancer; } private async Task<SubscriptionLoadBalancer> GetSubscriptionLoadBalancer(string topic) { if (_subscriptionRouteDataCache.TryGetValue(topic, out var subscriptionLoadBalancer)) { return subscriptionLoadBalancer; } var topicRouteData = await GetRouteData(topic); return UpdateSubscriptionLoadBalancer(topic, topicRouteData); } protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData) { UpdateSubscriptionLoadBalancer(topic, topicRouteData); } internal override Settings GetSettings() { return _simpleSubscriptionSettings; } public async Task<List<MessageView>> Receive(int maxMessageNum, TimeSpan invisibleDuration) { if (State.Running != State) { throw new InvalidOperationException("Simple consumer is not running"); } if (maxMessageNum <= 0) { throw new InternalErrorException("maxMessageNum must be greater than 0"); } var copy = new ConcurrentDictionary<string, FilterExpression>(_subscriptionExpressions); var topics = new List<string>(copy.Keys); if (topics.Count <= 0) { throw new ArgumentException("There is no topic to receive message"); } var index = Utilities.GetPositiveMod(Interlocked.Increment(ref _topicRoundRobinIndex), topics.Count); var topic = topics[index]; var filterExpression = _subscriptionExpressions[topic]; var subscriptionLoadBalancer = await GetSubscriptionLoadBalancer(topic); var mq = subscriptionLoadBalancer.TakeMessageQueue(); var request = WrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, _awaitDuration, invisibleDuration); var receiveMessageResult = await ReceiveMessage(request, mq, _awaitDuration); return receiveMessageResult.Messages; } public async Task ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration) { if (State.Running != State) { throw new InvalidOperationException("Simple consumer is not running"); } var request = WrapChangeInvisibleDuration(messageView, invisibleDuration); var invocation = await ClientManager.ChangeInvisibleDuration(messageView.MessageQueue.Broker.Endpoints, request, ClientConfig.RequestTimeout); StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); } public async Task Ack(MessageView messageView) { if (State.Running != State) { throw new InvalidOperationException("Simple consumer is not running"); } var request = WrapAckMessageRequest(messageView); var invocation = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request, ClientConfig.RequestTimeout); StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); } private Proto.AckMessageRequest WrapAckMessageRequest(MessageView messageView) { var topicResource = new Proto.Resource { ResourceNamespace = _clientConfig.Namespace, Name = messageView.Topic }; var entry = new Proto.AckMessageEntry { MessageId = messageView.MessageId, ReceiptHandle = messageView.ReceiptHandle, }; return new Proto.AckMessageRequest { Group = GetProtobufGroup(), Topic = topicResource, Entries = { entry } }; } private Proto.ChangeInvisibleDurationRequest WrapChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration) { var topicResource = new Proto.Resource { ResourceNamespace = _clientConfig.Namespace, Name = messageView.Topic }; return new Proto.ChangeInvisibleDurationRequest { Topic = topicResource, Group = GetProtobufGroup(), ReceiptHandle = messageView.ReceiptHandle, InvisibleDuration = Duration.FromTimeSpan(invisibleDuration), MessageId = messageView.MessageId }; } private Proto.Resource GetProtobufGroup() { return new Proto.Resource() { ResourceNamespace = _clientConfig.Namespace, Name = ConsumerGroup }; } public class Builder { private ClientConfig _clientConfig; private string _consumerGroup; private TimeSpan _awaitDuration; private ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions; public Builder SetClientConfig(ClientConfig clientConfig) { Preconditions.CheckArgument(null != clientConfig, "clientConfig should not be null"); _clientConfig = clientConfig; return this; } public Builder SetConsumerGroup(string consumerGroup) { Preconditions.CheckArgument(null != consumerGroup, "consumerGroup should not be null"); Preconditions.CheckArgument(consumerGroup != null && ConsumerGroupRegex.Match(consumerGroup).Success, $"topic does not match the regex {ConsumerGroupRegex}"); _consumerGroup = consumerGroup; return this; } public Builder SetAwaitDuration(TimeSpan awaitDuration) { _awaitDuration = awaitDuration; return this; } public Builder SetSubscriptionExpression(Dictionary<string, FilterExpression> subscriptionExpressions) { Preconditions.CheckArgument(null != subscriptionExpressions, "subscriptionExpressions should not be null"); Preconditions.CheckArgument(subscriptionExpressions!.Count != 0, "subscriptionExpressions should not be empty"); _subscriptionExpressions = new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions!); return this; } public async Task<SimpleConsumer> Build() { Preconditions.CheckArgument(null != _clientConfig, "clientConfig has not been set yet"); Preconditions.CheckArgument(null != _consumerGroup, "consumerGroup has not been set yet"); Preconditions.CheckArgument(!_subscriptionExpressions!.IsEmpty, "subscriptionExpressions has not been set yet"); var simpleConsumer = new SimpleConsumer(_clientConfig, _consumerGroup, _awaitDuration, _subscriptionExpressions); await simpleConsumer.Start(); return simpleConsumer; } } } }