Future addStream()

in lib/src/http_impl.dart [1224:1298]


  Future addStream(Stream<List<int>> stream) {
    if (_socketError) {
      stream.listen(null).cancel();
      return Future.value(outbound);
    }
    if (ignoreBody) {
      stream.drain().catchError((_) {});
      var future = writeHeaders();
      if (future != null) {
        return future.then((_) => close());
      }
      return close();
    }
    StreamSubscription<List<int>> sub;
    // Use new stream so we are able to pause (see below listen). The
    // alternative is to use stream.extand, but that won't give us a way of
    // pausing.
    var controller = StreamController<List<int>>(
        onPause: () => sub.pause(), onResume: () => sub.resume(), sync: true);

    void onData(List<int> data) {
      if (_socketError) return;
      if (data.isEmpty) return;
      if (chunked) {
        if (_gzip) {
          _gzipAdd = controller.add;
          _addGZipChunk(data, _gzipSink.add);
          _gzipAdd = null;
          return;
        }
        _addChunk(_chunkHeader(data.length), controller.add);
        _pendingChunkedFooter = 2;
      } else {
        if (contentLength != null) {
          _bytesWritten += data.length;
          if (_bytesWritten > contentLength) {
            controller.addError(
                HttpException("Content size exceeds specified contentLength. "
                    "$_bytesWritten bytes written while expected "
                    "$contentLength. "
                    "[${String.fromCharCodes(data)}]"));
            return;
          }
        }
      }
      _addChunk(data, controller.add);
    }

    sub = stream.listen(onData,
        onError: controller.addError,
        onDone: controller.close,
        cancelOnError: true);
    // Write headers now that we are listening to the stream.
    if (!headersWritten) {
      var future = writeHeaders();
      if (future != null) {
        // While incoming is being drained, the pauseFuture is non-null. Pause
        // output until it's drained.
        sub.pause(future);
      }
    }
    return socket.addStream(controller.stream).then((_) {
      return outbound;
    }, onError: (error, StackTrace stackTrace) {
      // Be sure to close it in case of an error.
      if (_gzip) _gzipSink.close();
      _socketError = true;
      _doneCompleter.completeError(error, stackTrace);
      if (_ignoreError(error)) {
        return outbound;
      } else {
        throw error;
      }
    });
  }