csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs (108 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Concurrent; using System.Collections.Generic; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging; using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { public class PushSubscriptionSettings : Settings { private static readonly ILogger Logger = MqLogManager.CreateLogger<PushSubscriptionSettings>(); private readonly Resource _group; private readonly ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions; private volatile bool _fifo = false; private volatile int _receiveBatchSize = 32; private TimeSpan _longPollingTimeout = TimeSpan.FromSeconds(30); public PushSubscriptionSettings(string namespaceName, string clientId, Endpoints endpoints, string consumerGroup, TimeSpan requestTimeout, ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(namespaceName, clientId, ClientType.PushConsumer, endpoints, requestTimeout) { _group = new Resource(namespaceName, consumerGroup); _subscriptionExpressions = subscriptionExpressions; } public bool IsFifo() { return _fifo; } public int GetReceiveBatchSize() { return _receiveBatchSize; } public TimeSpan GetLongPollingTimeout() { return _longPollingTimeout; } public override Proto.Settings ToProtobuf() { var subscriptionEntries = new List<Proto.SubscriptionEntry>(); foreach (var (key, value) in _subscriptionExpressions) { var topic = new Proto.Resource() { ResourceNamespace = Namespace, Name = key }; var filterExpression = new Proto.FilterExpression() { Expression = value.Expression }; switch (value.Type) { case ExpressionType.Tag: filterExpression.Type = Proto.FilterType.Tag; break; case ExpressionType.Sql92: filterExpression.Type = Proto.FilterType.Sql; break; default: Logger.LogWarning($"[Bug] Unrecognized filter type={value.Type} for push consumer"); break; } var subscriptionEntry = new Proto.SubscriptionEntry { Topic = topic, Expression = filterExpression }; subscriptionEntries.Add(subscriptionEntry); } var subscription = new Proto.Subscription { Group = _group.ToProtobuf(), Subscriptions = { subscriptionEntries } }; return new Proto.Settings { AccessPoint = Endpoints.ToProtobuf(), ClientType = ClientTypeHelper.ToProtobuf(ClientType), RequestTimeout = Duration.FromTimeSpan(RequestTimeout), Subscription = subscription, UserAgent = UserAgent.Instance.ToProtobuf() }; } public override void Sync(Proto.Settings settings) { if (Proto.Settings.PubSubOneofCase.Subscription != settings.PubSubCase) { Logger.LogError($"[Bug] Issued settings doesn't match with the client type, clientId={ClientId}, " + $"pubSubCase={settings.PubSubCase}, clientType={ClientType}"); } var subscription = settings.Subscription ?? new Proto.Subscription(); _fifo = subscription.Fifo; _receiveBatchSize = subscription.ReceiveBatchSize; _longPollingTimeout = subscription.LongPollingTimeout?.ToTimeSpan() ?? TimeSpan.Zero; var backoffPolicy = settings.BackoffPolicy ?? new Proto.RetryPolicy(); switch (backoffPolicy.StrategyCase) { case Proto.RetryPolicy.StrategyOneofCase.ExponentialBackoff: RetryPolicy = ExponentialBackoffRetryPolicy.FromProtobuf(backoffPolicy); break; case Proto.RetryPolicy.StrategyOneofCase.CustomizedBackoff: RetryPolicy = CustomizedBackoffRetryPolicy.FromProtobuf(backoffPolicy); break; default: throw new ArgumentException("Unrecognized backoff policy strategy."); } } } }