in lib/src/connection.dart [155:236]
void _setupConnection(Stream<List<int>> incoming,
StreamSink<List<int>> outgoing, Settings settingsObject) {
// Setup frame reading.
var incomingFrames =
FrameReader(incoming, acknowledgedSettings).startDecoding();
_frameReaderSubscription = incomingFrames.listen((Frame frame) {
_catchProtocolErrors(() => _handleFrameImpl(frame));
}, onError: (error, stack) {
_terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true);
}, onDone: () {
// Ensure existing messages from lower levels are sent to the upper
// levels before we terminate everything.
_incomingQueue.forceDispatchIncomingMessages();
_streams.forceDispatchIncomingMessages();
_terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true);
});
// Setup frame writing.
_frameWriter = FrameWriter(_hpackContext.encoder, outgoing, peerSettings);
_frameWriter.doneFuture.whenComplete(() {
_terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true);
});
// Setup handlers.
_settingsHandler = SettingsHandler(_hpackContext.encoder, _frameWriter,
acknowledgedSettings, peerSettings);
_pingHandler = PingHandler(_frameWriter);
var settings = _decodeSettings(settingsObject);
// Do the initial settings handshake (possibly with pushes disabled).
_settingsHandler.changeSettings(settings).catchError((error) {
// TODO: The [error] can contain sensitive information we now expose via
// a [Goaway] frame. We should somehow ensure we're only sending useful
// but non-sensitive information.
_terminate(ErrorCode.PROTOCOL_ERROR,
message: 'Failed to set initial settings (error: $error).');
});
_settingsHandler.onInitialWindowSizeChange.listen((int difference) {
_catchProtocolErrors(() {
_streams.processInitialWindowSizeSettingChange(difference);
});
});
// Setup the connection window handler, which keeps track of the
// size of the outgoing connection window.
_connectionWindowHandler = OutgoingConnectionWindowHandler(_peerWindow);
var connectionWindowUpdater =
IncomingWindowHandler.connection(_frameWriter, _localWindow);
// Setup queues for outgoing/incoming messages on the connection level.
_outgoingQueue =
ConnectionMessageQueueOut(_connectionWindowHandler, _frameWriter);
_incomingQueue =
ConnectionMessageQueueIn(connectionWindowUpdater, _catchProtocolErrors);
if (isClientConnection) {
_streams = StreamHandler.client(
_frameWriter,
_incomingQueue,
_outgoingQueue,
_settingsHandler.peerSettings,
_settingsHandler.acknowledgedSettings,
_activeStateHandler);
} else {
_streams = StreamHandler.server(
_frameWriter,
_incomingQueue,
_outgoingQueue,
_settingsHandler.peerSettings,
_settingsHandler.acknowledgedSettings,
_activeStateHandler);
}
// NOTE: We're not waiting until initial settings have been exchanged
// before we start using the connection (i.e. we don't wait for half a
// round-trip-time).
_state = ConnectionState();
}