csharp/rocketmq-client-csharp/Session.cs (104 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using Grpc.Core; using Microsoft.Extensions.Logging; using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { // refer to https://learn.microsoft.com/en-us/aspnet/core/grpc/client?view=aspnetcore-7.0#bi-directional-streaming-call. public class Session { private static readonly ILogger Logger = MqLogManager.CreateLogger<Session>(); private static readonly TimeSpan SettingsInitializationTimeout = TimeSpan.FromSeconds(3); private readonly ManualResetEventSlim _event = new ManualResetEventSlim(false); private readonly AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> _streamingCall; private readonly Client _client; private readonly Endpoints _endpoints; private readonly SemaphoreSlim _semaphore; public Session(Endpoints endpoints, AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> streamingCall, Client client) { _endpoints = endpoints; _semaphore = new SemaphoreSlim(1); _streamingCall = streamingCall; _client = client; Loop(); } public async Task WriteAsync(Proto.TelemetryCommand telemetryCommand) { var writer = _streamingCall.RequestStream; await writer.WriteAsync(telemetryCommand); } // TODO: Test concurrency. public async Task SyncSettings(bool awaitResp) { // Add more buffer time. await _semaphore.WaitAsync(); try { var settings = _client.GetSettings(); var telemetryCommand = new Proto.TelemetryCommand { Settings = settings.ToProtobuf() }; await WriteAsync(telemetryCommand); // await writer.CompleteAsync(); if (awaitResp) { _event.Wait(_client.GetClientConfig().RequestTimeout.Add(SettingsInitializationTimeout)); } } finally { _semaphore.Release(); } } private void Loop() { Task.Run(async () => { await foreach (var response in _streamingCall.ResponseStream.ReadAllAsync()) { switch (response.CommandCase) { case Proto.TelemetryCommand.CommandOneofCase.Settings: { Logger.LogInformation( $"Receive setting from remote, endpoints={_endpoints}, clientId={_client.GetClientId()}"); _client.OnSettingsCommand(_endpoints, response.Settings); _event.Set(); break; } case Proto.TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand: { Logger.LogInformation( $"Receive orphaned transaction recovery command from remote, endpoints={_endpoints}, clientId={_client.GetClientId()}"); _client.OnRecoverOrphanedTransactionCommand(_endpoints, response.RecoverOrphanedTransactionCommand); break; } case Proto.TelemetryCommand.CommandOneofCase.VerifyMessageCommand: { Logger.LogInformation( $"Receive message verification command from remote, endpoints={_endpoints}, clientId={_client.GetClientId()}"); _client.OnVerifyMessageCommand(_endpoints, response.VerifyMessageCommand); break; } case Proto.TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand: { Logger.LogInformation( $"Receive thread stack print command from remote, endpoints={_endpoints}, clientId={_client.GetClientId()}"); _client.OnPrintThreadStackTraceCommand(_endpoints, response.PrintThreadStackTraceCommand); break; } default: { Logger.LogWarning( $"Receive unrecognized command from remote, endpoints={_endpoints}, command={response}, clientId={_client.GetClientId()}"); break; } } } }); } }; }