in src/WebJobs.Script.Grpc/Server/FunctionRpcService.cs [32:92]
public override async Task EventStream(IAsyncStreamReader<StreamingMessage> requestStream, IServerStreamWriter<StreamingMessage> responseStream, ServerCallContext context)
{
var cancelSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken);
CancellationTokenRegistration ctr = cts.Token.Register(static state => ((TaskCompletionSource<bool>)state).TrySetResult(false), cancelSource);
try
{
static Task<Task<bool>> MoveNextAsync(IAsyncStreamReader<StreamingMessage> requestStream, TaskCompletionSource<bool> cancelSource)
{
// GRPC does not accept cancellation tokens for individual reads, hence wrapper
var requestTask = requestStream.MoveNext(CancellationToken.None);
return Task.WhenAny(cancelSource.Task, requestTask);
}
if (await await MoveNextAsync(requestStream, cancelSource))
{
var currentMessage = requestStream.Current;
// expect first operation (and only the first; we don't support re-registration) to be StartStream
if (currentMessage.ContentCase == MsgType.StartStream)
{
var workerId = currentMessage.StartStream?.WorkerId;
if (!string.IsNullOrEmpty(workerId) && _eventManager.TryGetGrpcChannels(workerId, out var inbound, out var outbound))
{
// send work
_ = PushFromOutboundToGrpc(workerId, responseStream, outbound.Reader, cts.Token);
// this loop is "pull from gRPC and push to inbound"
do
{
currentMessage = requestStream.Current;
if (currentMessage.ContentCase == MsgType.InvocationResponse && !string.IsNullOrEmpty(currentMessage.InvocationResponse?.InvocationId))
{
_logger.LogTrace("Received invocation response for invocationId: {invocationId} from workerId: {workerId}", currentMessage.InvocationResponse.InvocationId, workerId);
}
var newInbound = new InboundGrpcEvent(workerId, currentMessage);
if (!inbound.Writer.TryWrite(newInbound))
{
await inbound.Writer.WriteAsync(newInbound);
}
currentMessage = null; // allow old messages to be collected while we wait
}
while (await await MoveNextAsync(requestStream, cancelSource));
}
}
}
}
catch (Exception rpcException)
{
// We catch the exception, just to report it, then re-throw it
_logger.LogError(rpcException, "Exception encountered while listening to EventStream");
throw;
}
finally
{
cts.Cancel();
ctr.Dispose();
// ensure cancellationSource task completes
cancelSource.TrySetResult(false);
}
}