src/Microsoft.Azure.SignalR.Management/HubContext/StreamingManagerAdapter.cs (71 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Azure.SignalR.Management.HubContext;
namespace Microsoft.Azure.SignalR.Management;
internal class StreamingManagerAdapter : StreamingManager
{
private readonly IStreamingHubLifetimeManager _lifetimeManager;
private readonly NegotiationOptions _negotiationOptions;
public StreamingManagerAdapter(IStreamingHubLifetimeManager lifetimeManager, NegotiationOptions negotiationOptions)
{
_lifetimeManager = lifetimeManager;
_negotiationOptions = negotiationOptions;
}
public override async Task SendStreamAsync<TItem>(string connectionId, string streamId, IAsyncEnumerable<TItem> items, CancellationToken cancellationToken = default)
{
bool isCompleted = false;
try
{
await foreach (var item in items.WithCancellation(cancellationToken))
{
await _lifetimeManager.SendStreamItemAsync(connectionId, streamId, item, cancellationToken);
}
isCompleted = true;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// do not send anything if the stream is cancelled.
}
catch (Exception ex)
{
await SendErrorAsync(connectionId, streamId, ex, cancellationToken);
}
if (isCompleted)
{
await _lifetimeManager.SendStreamCompletionAsync(connectionId, streamId, null, cancellationToken);
}
}
public override async Task SendStreamAsync<TItem>(string connectionId, string streamId, ChannelReader<TItem> channelReader, CancellationToken cancellationToken = default)
{
bool isCompleted = false;
try
{
while (await channelReader.WaitToReadAsync(cancellationToken))
{
while (channelReader.TryRead(out var item))
{
await _lifetimeManager.SendStreamItemAsync(connectionId, streamId, item, cancellationToken);
}
}
isCompleted = true;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// do not send anything if the stream is cancelled.
}
catch (Exception ex)
{
await SendErrorAsync(connectionId, streamId, ex, cancellationToken);
}
if (isCompleted)
{
await _lifetimeManager.SendStreamCompletionAsync(connectionId, streamId, null, cancellationToken);
}
}
private async Task SendErrorAsync(string connectionId, string streamId, Exception ex, CancellationToken cancellationToken)
{
var message = _negotiationOptions.EnableDetailedErrors ? ex.Message : "An error occurred.";
await _lifetimeManager.SendStreamCompletionAsync(connectionId, streamId, message, cancellationToken);
}
}