in lib/src/streams/stream_handler.dart [294:355]
Http2StreamImpl _newStreamInternal(int streamId) {
// For each new stream we must:
// - setup sending/receiving [Window]s with correct initial size
// - setup sending/receiving WindowHandlers which take care of
// updating the windows.
// - setup incoming/outgoing stream queues, which buffer data
// that is not handled by
// * the application [incoming]
// * the underlying transport [outgoing]
// - register incoming stream queue in connection-level queue
var outgoingStreamWindow =
Window(initialSize: _peerSettings.initialWindowSize);
var incomingStreamWindow =
Window(initialSize: _localSettings.initialWindowSize);
var windowOutHandler = OutgoingStreamWindowHandler(outgoingStreamWindow);
var windowInHandler = IncomingWindowHandler.stream(
_frameWriter, incomingStreamWindow, streamId);
var streamQueueIn = StreamMessageQueueIn(windowInHandler);
var streamQueueOut =
StreamMessageQueueOut(streamId, windowOutHandler, outgoingQueue);
incomingQueue.insertNewStreamMessageQueue(streamId, streamQueueIn);
var _outgoingC = StreamController<StreamMessage>();
var stream = Http2StreamImpl(streamQueueIn, streamQueueOut, _outgoingC,
streamId, windowOutHandler, _canPush, _push, _terminateStream);
final wasIdle = _openStreams.isEmpty;
_openStreams[stream.id] = stream;
_setupOutgoingMessageHandling(stream);
// Handle incoming stream cancellation. RST is only sent when streamQueueOut
// has been closed because RST make the stream 'closed'.
streamQueueIn.onCancel.then((_) {
// If our side is done sending data, i.e. we have enqueued the
// end-of-stream in the outgoing message queue, but the remote end is
// still sending us data, despite us not being interested in it, we will
// reset the stream.
if (stream.state == StreamState.HalfClosedLocal) {
stream.outgoingQueue
.enqueueMessage(ResetStreamMessage(stream.id, ErrorCode.CANCEL));
}
});
// NOTE: We are not interested whether the streams were normally finished
// or abnormally terminated. Therefore we use 'catchError((_) {})'!
var streamDone = [streamQueueIn.done, streamQueueOut.done];
Future.wait(streamDone).catchError((_) {}).whenComplete(() {
_cleanupClosedStream(stream);
});
if (wasIdle) {
_onActiveStateChanged(true);
}
return stream;
}