public override async Task EventStream()

in src/WebJobs.Script.Grpc/Server/FunctionRpcService.cs [32:92]


        public override async Task EventStream(IAsyncStreamReader<StreamingMessage> requestStream, IServerStreamWriter<StreamingMessage> responseStream, ServerCallContext context)
        {
            var cancelSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
            var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken);
            CancellationTokenRegistration ctr = cts.Token.Register(static state => ((TaskCompletionSource<bool>)state).TrySetResult(false), cancelSource);
            try
            {
                static Task<Task<bool>> MoveNextAsync(IAsyncStreamReader<StreamingMessage> requestStream, TaskCompletionSource<bool> cancelSource)
                {
                    // GRPC does not accept cancellation tokens for individual reads, hence wrapper
                    var requestTask = requestStream.MoveNext(CancellationToken.None);
                    return Task.WhenAny(cancelSource.Task, requestTask);
                }

                if (await await MoveNextAsync(requestStream, cancelSource))
                {
                    var currentMessage = requestStream.Current;
                    // expect first operation (and only the first; we don't support re-registration) to be StartStream
                    if (currentMessage.ContentCase == MsgType.StartStream)
                    {
                        var workerId = currentMessage.StartStream?.WorkerId;
                        if (!string.IsNullOrEmpty(workerId) && _eventManager.TryGetGrpcChannels(workerId, out var inbound, out var outbound))
                        {
                            // send work
                            _ = PushFromOutboundToGrpc(workerId, responseStream, outbound.Reader, cts.Token);

                            // this loop is "pull from gRPC and push to inbound"
                            do
                            {
                                currentMessage = requestStream.Current;
                                if (currentMessage.ContentCase == MsgType.InvocationResponse && !string.IsNullOrEmpty(currentMessage.InvocationResponse?.InvocationId))
                                {
                                    _logger.LogTrace("Received invocation response for invocationId: {invocationId} from workerId: {workerId}", currentMessage.InvocationResponse.InvocationId, workerId);
                                }
                                var newInbound = new InboundGrpcEvent(workerId, currentMessage);
                                if (!inbound.Writer.TryWrite(newInbound))
                                {
                                    await inbound.Writer.WriteAsync(newInbound);
                                }
                                currentMessage = null; // allow old messages to be collected while we wait
                            }
                            while (await await MoveNextAsync(requestStream, cancelSource));
                        }
                    }
                }
            }
            catch (Exception rpcException)
            {
                // We catch the exception, just to report it, then re-throw it
                _logger.LogError(rpcException, "Exception encountered while listening to EventStream");
                throw;
            }
            finally
            {
                cts.Cancel();
                ctr.Dispose();

                // ensure cancellationSource task completes
                cancelSource.TrySetResult(false);
            }
        }