Http2StreamImpl _newStreamInternal()

in lib/src/streams/stream_handler.dart [294:355]


  Http2StreamImpl _newStreamInternal(int streamId) {
    // For each new stream we must:
    //   - setup sending/receiving [Window]s with correct initial size
    //   - setup sending/receiving WindowHandlers which take care of
    //     updating the windows.
    //   - setup incoming/outgoing stream queues, which buffer data
    //     that is not handled by
    //       * the application [incoming]
    //       * the underlying transport [outgoing]
    //   - register incoming stream queue in connection-level queue

    var outgoingStreamWindow =
        Window(initialSize: _peerSettings.initialWindowSize);

    var incomingStreamWindow =
        Window(initialSize: _localSettings.initialWindowSize);

    var windowOutHandler = OutgoingStreamWindowHandler(outgoingStreamWindow);

    var windowInHandler = IncomingWindowHandler.stream(
        _frameWriter, incomingStreamWindow, streamId);

    var streamQueueIn = StreamMessageQueueIn(windowInHandler);
    var streamQueueOut =
        StreamMessageQueueOut(streamId, windowOutHandler, outgoingQueue);

    incomingQueue.insertNewStreamMessageQueue(streamId, streamQueueIn);

    var _outgoingC = StreamController<StreamMessage>();
    var stream = Http2StreamImpl(streamQueueIn, streamQueueOut, _outgoingC,
        streamId, windowOutHandler, _canPush, _push, _terminateStream);
    final wasIdle = _openStreams.isEmpty;
    _openStreams[stream.id] = stream;

    _setupOutgoingMessageHandling(stream);

    // Handle incoming stream cancellation. RST is only sent when streamQueueOut
    // has been closed because RST make the stream 'closed'.
    streamQueueIn.onCancel.then((_) {
      // If our side is done sending data, i.e. we have enqueued the
      // end-of-stream in the outgoing message queue, but the remote end is
      // still sending us data, despite us not being interested in it, we will
      // reset the stream.
      if (stream.state == StreamState.HalfClosedLocal) {
        stream.outgoingQueue
            .enqueueMessage(ResetStreamMessage(stream.id, ErrorCode.CANCEL));
      }
    });

    // NOTE: We are not interested whether the streams were normally finished
    // or abnormally terminated. Therefore we use 'catchError((_) {})'!
    var streamDone = [streamQueueIn.done, streamQueueOut.done];
    Future.wait(streamDone).catchError((_) {}).whenComplete(() {
      _cleanupClosedStream(stream);
    });

    if (wasIdle) {
      _onActiveStateChanged(true);
    }

    return stream;
  }