src/Messaging/MessagingStream.cs (46 lines of code) (raw):
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
namespace Microsoft.Azure.Functions.PowerShellWorker.Messaging
{
internal class MessagingStream
{
private readonly AsyncDuplexStreamingCall<StreamingMessage, StreamingMessage> _call;
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(initialCount: 1, maxCount: 1);
internal MessagingStream(string host, int port)
{
// To call unsecured gRPC services, ensure the address starts with 'http' as opposed to 'https'.
// For more detail, see https://docs.microsoft.com/en-us/aspnet/core/grpc/client?view=aspnetcore-6.0
string uriString = $"http://{host}:{port}";
if (!Uri.TryCreate(uriString, UriKind.Absolute, out Uri grpcUri))
{
throw new InvalidOperationException($"The gRPC channel URI '{uriString}' could not be parsed.");
}
const int maxMessageLength = int.MaxValue;
var channelOptions = new GrpcChannelOptions
{
MaxReceiveMessageSize = maxMessageLength,
MaxSendMessageSize = maxMessageLength,
Credentials = ChannelCredentials.Insecure
};
GrpcChannel channel = GrpcChannel.ForAddress(grpcUri, channelOptions);
_call = new FunctionRpc.FunctionRpcClient(channel).EventStream();
}
/// <summary>
/// Get the current message.
/// </summary>
internal StreamingMessage GetCurrentMessage() => _call.ResponseStream.Current;
/// <summary>
/// Move to the next message.
/// </summary>
internal async Task<bool> MoveNext() => await _call.ResponseStream.MoveNext(CancellationToken.None);
/// <summary>
/// Write the outgoing message.
/// </summary>
internal void Write(StreamingMessage message) => WriteImplAsync(message).ConfigureAwait(false);
/// <summary>
/// Take a message from the buffer and write to the gRPC channel.
/// </summary>
private async Task WriteImplAsync(StreamingMessage message)
{
try
{
await _semaphoreSlim.WaitAsync();
await _call.RequestStream.WriteAsync(message);
}
finally
{
_semaphoreSlim.Release();
}
}
}
}