in lib/src/http_impl.dart [1224:1298]
Future addStream(Stream<List<int>> stream) {
if (_socketError) {
stream.listen(null).cancel();
return Future.value(outbound);
}
if (ignoreBody) {
stream.drain().catchError((_) {});
var future = writeHeaders();
if (future != null) {
return future.then((_) => close());
}
return close();
}
StreamSubscription<List<int>> sub;
// Use new stream so we are able to pause (see below listen). The
// alternative is to use stream.extand, but that won't give us a way of
// pausing.
var controller = StreamController<List<int>>(
onPause: () => sub.pause(), onResume: () => sub.resume(), sync: true);
void onData(List<int> data) {
if (_socketError) return;
if (data.isEmpty) return;
if (chunked) {
if (_gzip) {
_gzipAdd = controller.add;
_addGZipChunk(data, _gzipSink.add);
_gzipAdd = null;
return;
}
_addChunk(_chunkHeader(data.length), controller.add);
_pendingChunkedFooter = 2;
} else {
if (contentLength != null) {
_bytesWritten += data.length;
if (_bytesWritten > contentLength) {
controller.addError(
HttpException("Content size exceeds specified contentLength. "
"$_bytesWritten bytes written while expected "
"$contentLength. "
"[${String.fromCharCodes(data)}]"));
return;
}
}
}
_addChunk(data, controller.add);
}
sub = stream.listen(onData,
onError: controller.addError,
onDone: controller.close,
cancelOnError: true);
// Write headers now that we are listening to the stream.
if (!headersWritten) {
var future = writeHeaders();
if (future != null) {
// While incoming is being drained, the pauseFuture is non-null. Pause
// output until it's drained.
sub.pause(future);
}
}
return socket.addStream(controller.stream).then((_) {
return outbound;
}, onError: (error, StackTrace stackTrace) {
// Be sure to close it in case of an error.
if (_gzip) _gzipSink.close();
_socketError = true;
_doneCompleter.completeError(error, stackTrace);
if (_ignoreError(error)) {
return outbound;
} else {
throw error;
}
});
}