csharp/rocketmq-client-csharp/Client.cs (423 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System.Collections.Generic; using System.Collections.Concurrent; using System.Threading.Tasks; using System.Threading; using System; using System.Linq; using System.Runtime.CompilerServices; using Microsoft.Extensions.Logging; using Proto = Apache.Rocketmq.V2; using grpcLib = Grpc.Core; [assembly: InternalsVisibleTo("tests")] [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] namespace Org.Apache.Rocketmq { public abstract class Client { private static readonly ILogger Logger = MqLogManager.CreateLogger<Client>(); private static readonly TimeSpan HeartbeatScheduleDelay = TimeSpan.FromSeconds(1); private static readonly TimeSpan HeartbeatSchedulePeriod = TimeSpan.FromSeconds(10); private readonly CancellationTokenSource _heartbeatCts; private static readonly TimeSpan TopicRouteUpdateScheduleDelay = TimeSpan.FromSeconds(10); private static readonly TimeSpan TopicRouteUpdateSchedulePeriod = TimeSpan.FromSeconds(30); private readonly CancellationTokenSource _topicRouteUpdateCts; private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromSeconds(1); private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromMinutes(5); private readonly CancellationTokenSource _settingsSyncCts; private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(1); private static readonly TimeSpan StatsSchedulePeriod = TimeSpan.FromSeconds(60); private readonly CancellationTokenSource _statsCts; protected readonly ClientConfig ClientConfig; protected readonly Endpoints Endpoints; protected IClientManager ClientManager; protected readonly string ClientId; protected readonly ClientMeterManager ClientMeterManager; protected readonly ConcurrentDictionary<Endpoints, bool> Isolated; private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteCache; private readonly Dictionary<Endpoints, Session> _sessionsTable; private readonly ReaderWriterLockSlim _sessionLock; internal volatile State State; protected Client(ClientConfig clientConfig) { ClientConfig = clientConfig; Endpoints = new Endpoints(clientConfig.Endpoints); ClientId = Utilities.GetClientId(); ClientMeterManager = new ClientMeterManager(this); ClientManager = new ClientManager(this); Isolated = new ConcurrentDictionary<Endpoints, bool>(); _topicRouteCache = new ConcurrentDictionary<string, TopicRouteData>(); _topicRouteUpdateCts = new CancellationTokenSource(); _settingsSyncCts = new CancellationTokenSource(); _heartbeatCts = new CancellationTokenSource(); _statsCts = new CancellationTokenSource(); _sessionsTable = new Dictionary<Endpoints, Session>(); _sessionLock = new ReaderWriterLockSlim(); State = State.New; } protected virtual async Task Start() { Logger.LogDebug($"Begin to start the rocketmq client, clientId={ClientId}"); foreach (var topic in GetTopics()) { await FetchTopicRoute(topic); } ScheduleWithFixedDelay(UpdateTopicRouteCache, TopicRouteUpdateScheduleDelay, TopicRouteUpdateSchedulePeriod, _topicRouteUpdateCts.Token); ScheduleWithFixedDelay(Heartbeat, HeartbeatScheduleDelay, HeartbeatSchedulePeriod, _heartbeatCts.Token); ScheduleWithFixedDelay(SyncSettings, SettingsSyncScheduleDelay, SettingsSyncSchedulePeriod, _settingsSyncCts.Token); ScheduleWithFixedDelay(Stats, StatsScheduleDelay, StatsSchedulePeriod, _statsCts.Token); Logger.LogDebug($"Start the rocketmq client successfully, clientId={ClientId}"); } protected virtual async Task Shutdown() { Logger.LogDebug($"Begin to shutdown rocketmq client, clientId={ClientId}"); _heartbeatCts.Cancel(); _topicRouteUpdateCts.Cancel(); _settingsSyncCts.Cancel(); _statsCts.Cancel(); NotifyClientTermination(); await ClientManager.Shutdown(); ClientMeterManager.Shutdown(); Logger.LogDebug($"Shutdown the rocketmq client successfully, clientId={ClientId}"); } private protected (bool, Session) GetSession(Endpoints endpoints) { _sessionLock.EnterReadLock(); try { // Session exists, return in advance. if (_sessionsTable.TryGetValue(endpoints, out var session)) { return (false, session); } } finally { _sessionLock.ExitReadLock(); } _sessionLock.EnterWriteLock(); try { // Session exists, return in advance. if (_sessionsTable.TryGetValue(endpoints, out var session)) { return (false, session); } var stream = ClientManager.Telemetry(endpoints); var created = new Session(endpoints, stream, this); _sessionsTable.Add(endpoints, created); return (true, created); } finally { _sessionLock.ExitWriteLock(); } } protected abstract IEnumerable<string> GetTopics(); internal abstract Proto::HeartbeatRequest WrapHeartbeatRequest(); protected abstract void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData); internal async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData) { var routeEndpoints = new HashSet<Endpoints>(); foreach (var mq in topicRouteData.MessageQueues) { routeEndpoints.Add(mq.Broker.Endpoints); } var existedRouteEndpoints = GetTotalRouteEndpoints(); var newEndpoints = routeEndpoints.Except(existedRouteEndpoints); foreach (var endpoints in newEndpoints) { var (created, session) = GetSession(endpoints); if (!created) { continue; } Logger.LogInformation($"Begin to establish session for endpoints={endpoints}, clientId={ClientId}"); await session.SyncSettings(true); Logger.LogInformation($"Establish session for endpoints={endpoints} successfully, clientId={ClientId}"); } _topicRouteCache[topic] = topicRouteData; OnTopicRouteDataUpdated0(topic, topicRouteData); } /** * Return all endpoints of brokers in route table. */ private HashSet<Endpoints> GetTotalRouteEndpoints() { var endpoints = new HashSet<Endpoints>(); foreach (var item in _topicRouteCache) { foreach (var endpoint in item.Value.MessageQueues.Select(mq => mq.Broker.Endpoints)) { endpoints.Add(endpoint); } } return endpoints; } private async void UpdateTopicRouteCache() { try { Logger.LogInformation($"Start to update topic route cache for a new round, clientId={ClientId}"); Dictionary<string, Task<TopicRouteData>> responses = new Dictionary<string, Task<TopicRouteData>>(); foreach (var topic in GetTopics()) { var task = FetchTopicRoute(topic); responses[topic] = task; } foreach (var item in responses.Keys) { try { await responses[item]; } catch (Exception e) { Logger.LogError(e, $"Failed to update topic route cache, topic={item}"); } } } catch (Exception e) { Logger.LogError(e, $"[Bug] unexpected exception raised during topic route cache update, " + $"clientId={ClientId}"); } } private async void SyncSettings() { try { var totalRouteEndpoints = GetTotalRouteEndpoints(); foreach (var endpoints in totalRouteEndpoints) { var (_, session) = GetSession(endpoints); await session.SyncSettings(false); Logger.LogInformation($"Sync settings to remote, endpoints={endpoints}"); } } catch (Exception e) { Logger.LogError(e, $"[Bug] unexpected exception raised during setting sync, clientId={ClientId}"); } } private void Stats() { ThreadPool.GetAvailableThreads(out var availableWorker, out var availableIo); Logger.LogInformation( $"ClientId={ClientId}, ClientVersion={MetadataConstants.Instance.ClientVersion}, " + $".NET Version={Environment.Version}, ThreadCount={ThreadPool.ThreadCount}, " + $"CompletedWorkItemCount={ThreadPool.CompletedWorkItemCount}, " + $"PendingWorkItemCount={ThreadPool.PendingWorkItemCount}, AvailableWorkerThreads={availableWorker}, " + $"AvailableCompletionPortThreads={availableIo}"); } private protected void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token) { Task.Run(async () => { await Task.Delay(delay, token); while (!token.IsCancellationRequested) { try { action(); } catch (Exception e) { Logger.LogError(e, $"Failed to execute scheduled task, ClientId={ClientId}"); } finally { await Task.Delay(period, token); } } }, token); } protected async Task<TopicRouteData> GetRouteData(string topic) { if (_topicRouteCache.TryGetValue(topic, out var topicRouteData)) { return topicRouteData; } topicRouteData = await FetchTopicRoute(topic); return topicRouteData; } private async Task<TopicRouteData> FetchTopicRoute(string topic) { var topicRouteData = await FetchTopicRoute0(topic); await OnTopicRouteDataFetched(topic, topicRouteData); Logger.LogInformation( $"Fetch topic route successfully, clientId={ClientId}, topic={topic}, topicRouteData={topicRouteData}"); return topicRouteData; } private async Task<TopicRouteData> FetchTopicRoute0(string topic) { try { var request = new Proto::QueryRouteRequest { Topic = new Proto::Resource { ResourceNamespace = ClientConfig.Namespace, Name = topic }, Endpoints = Endpoints.ToProtobuf() }; var invocation = await ClientManager.QueryRoute(Endpoints, request, ClientConfig.RequestTimeout); var code = invocation.Response.Status.Code; if (!Proto.Code.Ok.Equals(code)) { Logger.LogError($"Failed to fetch topic route, clientId={ClientId}, topic={topic}, code={code}, " + $"statusMessage={invocation.Response.Status.Message}"); } StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); var messageQueues = invocation.Response.MessageQueues.ToList(); return new TopicRouteData(messageQueues); } catch (Exception e) { Logger.LogError(e, $"Failed to fetch topic route, clientId={ClientId}, topic={topic}"); throw; } } private async void Heartbeat() { try { var endpoints = GetTotalRouteEndpoints(); var request = WrapHeartbeatRequest(); var invocations = new Dictionary<Endpoints, Task<RpcInvocation<Proto.HeartbeatRequest, Proto.HeartbeatResponse>>>(); // Collect task into a map. foreach (var item in endpoints) { var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout); invocations[item] = task; } foreach (var item in invocations.Keys) { try { var invocation = await invocations[item]; var code = invocation.Response.Status.Code; if (code.Equals(Proto.Code.Ok)) { Logger.LogInformation($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}"); if (Isolated.TryRemove(item, out _)) { Logger.LogInformation($"Rejoin endpoints which was isolated before, endpoints={item}, " + $"clientId={ClientId}"); } return; } var statusMessage = invocation.Response.Status.Message; Logger.LogInformation($"Failed to send heartbeat, endpoints={item}, code={code}, " + $"statusMessage={statusMessage}, clientId={ClientId}"); } catch (Exception e) { Logger.LogError(e, $"Failed to send heartbeat, endpoints={item}"); } } } catch (Exception e) { Logger.LogError(e, $"[Bug] unexpected exception raised during heartbeat, clientId={ClientId}"); } } internal grpcLib.Metadata Sign() { var metadata = new grpcLib::Metadata(); Signature.Sign(this, metadata); return metadata; } internal abstract Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest(); private async void NotifyClientTermination() { Logger.LogInformation($"Notify remote endpoints that current client is terminated, clientId={ClientId}"); var endpoints = GetTotalRouteEndpoints(); var request = WrapNotifyClientTerminationRequest(); foreach (var item in endpoints) { var invocation = await ClientManager.NotifyClientTermination(item, request, ClientConfig.RequestTimeout); try { StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); } catch (Exception e) { Logger.LogError(e, $"Failed to notify client's termination, clientId=${ClientId}, " + $"endpoints=${item}"); } } } internal abstract Settings GetSettings(); internal string GetClientId() { return ClientId; } internal ClientConfig GetClientConfig() { return ClientConfig; } internal IClientManager GetClientManager() { return ClientManager; } // Only for testing internal void SetClientManager(IClientManager clientManager) { ClientManager = clientManager; } internal virtual void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, Proto.RecoverOrphanedTransactionCommand command) { Logger.LogWarning($"Ignore orphaned transaction recovery command from remote, which is not expected, " + $"clientId={ClientId}, endpoints={endpoints}"); } internal virtual async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command) { // Only push consumer support message consumption verification. Logger.LogWarning($"Ignore verify message command from remote, which is not expected, clientId={ClientId}, " + $"endpoints={endpoints}, command={command}"); var status = new Proto.Status { Code = Proto.Code.Unsupported, Message = "Message consumption verification is not supported" }; var verifyMessageResult = new Proto.VerifyMessageResult { Nonce = command.Nonce }; var telemetryCommand = new Proto.TelemetryCommand { VerifyMessageResult = verifyMessageResult, Status = status }; var (_, session) = GetSession(endpoints); await session.WriteAsync(telemetryCommand); } internal async void OnPrintThreadStackTraceCommand(Endpoints endpoints, Proto.PrintThreadStackTraceCommand command) { Logger.LogWarning("Ignore thread stack trace printing command from remote because it is still not supported, " + $"clientId={ClientId}, endpoints={endpoints}"); var status = new Proto.Status { Code = Proto.Code.Unsupported, Message = "C# don't support thread stack trace printing" }; var threadStackTrace = new Proto.ThreadStackTrace { Nonce = command.Nonce }; var telemetryCommand = new Proto.TelemetryCommand { ThreadStackTrace = threadStackTrace, Status = status, }; var (_, session) = GetSession(endpoints); await session.WriteAsync(telemetryCommand); } internal void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings) { var metric = new Metric(settings.Metric ?? new Proto.Metric()); ClientMeterManager.Reset(metric); GetSettings().Sync(settings); } } }