in rocketmq-client-csharp/Session.cs [41:95]
public async Task Loop()
{
var reader = this._stream.ResponseStream;
var writer = this._stream.RequestStream;
var request = new rmq::TelemetryCommand();
request.Settings = new rmq::Settings();
_client.BuildClientSetting(request.Settings);
await writer.WriteAsync(request);
Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
while (!_client.TelemetryCts().IsCancellationRequested)
{
if (await reader.MoveNext(_client.TelemetryCts().Token))
{
var cmd = reader.Current;
Logger.Debug($"Received a TelemetryCommand: {cmd.ToString()}");
switch (cmd.CommandCase)
{
case rmq::TelemetryCommand.CommandOneofCase.None:
{
Logger.Warn($"Telemetry failed: {cmd.Status}");
if (0 == Interlocked.CompareExchange(ref _established, 0, 2))
{
await _channel.Writer.WriteAsync(false);
}
break;
}
case rmq::TelemetryCommand.CommandOneofCase.Settings:
{
if (0 == Interlocked.CompareExchange(ref _established, 0, 1))
{
await _channel.Writer.WriteAsync(true);
}
Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
_client.OnSettingsReceived(cmd.Settings);
break;
}
case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
{
break;
}
case rmq::TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand:
{
break;
}
case rmq::TelemetryCommand.CommandOneofCase.VerifyMessageCommand:
{
break;
}
}
}
}
Logger.Info("Telemetry stream cancelled");
await writer.CompleteAsync();
}