public virtual void ReceiveStream()

in libraries/Microsoft.Bot.Connector.Streaming/Session/StreamingSession.cs [219:312]


        public virtual void ReceiveStream(Header header, ArraySegment<byte> payload)
        {
            if (header == null)
            {
                throw new ArgumentNullException(nameof(header));
            }

            if (header.Type != PayloadTypes.Stream)
            {
                throw new InvalidOperationException($"StreamingSession cannot receive payload of type {header.Type} as stream");
            }

            if (payload == null)
            {
                throw new ArgumentNullException(nameof(payload));
            }

            Log.PayloadReceived(_logger, header);

            // Find request for incoming stream header
            if (_streamDefinitions.TryGetValue(header.Id, out StreamDefinition streamDefinition))
            {
                streamDefinition.Stream.Write(payload.Array, payload.Offset, payload.Count);

                // Is this the end of this stream?
                if (header.End)
                {
                    // Mark this stream as completed
                    if (streamDefinition is StreamDefinition streamDef)
                    {
                        streamDef.Complete = true;
                        streamDef.Stream.Seek(0, SeekOrigin.Begin);

                        List<IContentStream> streams = null;

                        // Find the request / response
                        if (streamDef.PayloadType == PayloadTypes.Request)
                        {
                            if (_requests.TryGetValue(streamDef.PayloadId, out ReceiveRequest req))
                            {
                                streams = req.Streams;
                            }
                        }
                        else if (streamDef.PayloadType == PayloadTypes.Response)
                        {
                            if (_responses.TryGetValue(streamDef.PayloadId, out ReceiveResponse res))
                            {
                                streams = res.Streams;
                            }
                        }

                        if (streams != null)
                        {
                            lock (_receiveSync)
                            {
                                // Have we completed all the streams we expect for this request?
                                bool allStreamsDone = streams.All(s => s is StreamDefinition streamDef && streamDef.Complete);

                                // If we received all the streams, then it's time to pass this request to the request handler!
                                // For example, if this request is a send activity, the request handler will deserialize the first stream
                                // into an activity and pass to the adapter.
                                if (allStreamsDone)
                                {
                                    if (streamDef.PayloadType == PayloadTypes.Request)
                                    {
                                        if (_requests.TryGetValue(streamDef.PayloadId, out ReceiveRequest request))
                                        {
                                            ProcessRequest(streamDef.PayloadId, request);
                                            _requests.Remove(streamDef.PayloadId);
                                        }
                                    }
                                    else if (streamDef.PayloadType == PayloadTypes.Response)
                                    {
                                        if (_responses.TryGetValue(streamDef.PayloadId, out ReceiveResponse response))
                                        {
                                            if (_pendingResponses.TryGetValue(streamDef.PayloadId, out TaskCompletionSource<ReceiveResponse> responseTask))
                                            {
                                                responseTask.SetResult(response);
                                                _responses.Remove(streamDef.PayloadId);
                                                _pendingResponses.TryRemove(streamDef.PayloadId, out TaskCompletionSource<ReceiveResponse> removedResponse);
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
            else
            {
                Log.OrphanedStream(_logger, header);
            }
        }