in libraries/Microsoft.Bot.Connector.Streaming/Session/StreamingSession.cs [219:312]
public virtual void ReceiveStream(Header header, ArraySegment<byte> payload)
{
if (header == null)
{
throw new ArgumentNullException(nameof(header));
}
if (header.Type != PayloadTypes.Stream)
{
throw new InvalidOperationException($"StreamingSession cannot receive payload of type {header.Type} as stream");
}
if (payload == null)
{
throw new ArgumentNullException(nameof(payload));
}
Log.PayloadReceived(_logger, header);
// Find request for incoming stream header
if (_streamDefinitions.TryGetValue(header.Id, out StreamDefinition streamDefinition))
{
streamDefinition.Stream.Write(payload.Array, payload.Offset, payload.Count);
// Is this the end of this stream?
if (header.End)
{
// Mark this stream as completed
if (streamDefinition is StreamDefinition streamDef)
{
streamDef.Complete = true;
streamDef.Stream.Seek(0, SeekOrigin.Begin);
List<IContentStream> streams = null;
// Find the request / response
if (streamDef.PayloadType == PayloadTypes.Request)
{
if (_requests.TryGetValue(streamDef.PayloadId, out ReceiveRequest req))
{
streams = req.Streams;
}
}
else if (streamDef.PayloadType == PayloadTypes.Response)
{
if (_responses.TryGetValue(streamDef.PayloadId, out ReceiveResponse res))
{
streams = res.Streams;
}
}
if (streams != null)
{
lock (_receiveSync)
{
// Have we completed all the streams we expect for this request?
bool allStreamsDone = streams.All(s => s is StreamDefinition streamDef && streamDef.Complete);
// If we received all the streams, then it's time to pass this request to the request handler!
// For example, if this request is a send activity, the request handler will deserialize the first stream
// into an activity and pass to the adapter.
if (allStreamsDone)
{
if (streamDef.PayloadType == PayloadTypes.Request)
{
if (_requests.TryGetValue(streamDef.PayloadId, out ReceiveRequest request))
{
ProcessRequest(streamDef.PayloadId, request);
_requests.Remove(streamDef.PayloadId);
}
}
else if (streamDef.PayloadType == PayloadTypes.Response)
{
if (_responses.TryGetValue(streamDef.PayloadId, out ReceiveResponse response))
{
if (_pendingResponses.TryGetValue(streamDef.PayloadId, out TaskCompletionSource<ReceiveResponse> responseTask))
{
responseTask.SetResult(response);
_responses.Remove(streamDef.PayloadId);
_pendingResponses.TryRemove(streamDef.PayloadId, out TaskCompletionSource<ReceiveResponse> removedResponse);
}
}
}
}
}
}
}
}
}
else
{
Log.OrphanedStream(_logger, header);
}
}