in lib/request.js [568:673]
createReadStream: function createReadStream() {
var streams = AWS.util.stream;
var req = this;
var stream = null;
if (AWS.HttpClient.streamsApiVersion === 2) {
stream = new streams.PassThrough();
process.nextTick(function() { req.send(); });
} else {
stream = new streams.Stream();
stream.readable = true;
stream.sent = false;
stream.on('newListener', function(event) {
if (!stream.sent && event === 'data') {
stream.sent = true;
process.nextTick(function() { req.send(); });
}
});
}
this.on('error', function(err) {
stream.emit('error', err);
});
this.on('httpHeaders', function streamHeaders(statusCode, headers, resp) {
if (statusCode < 300) {
req.removeListener('httpData', AWS.EventListeners.Core.HTTP_DATA);
req.removeListener('httpError', AWS.EventListeners.Core.HTTP_ERROR);
req.on('httpError', function streamHttpError(error) {
resp.error = error;
resp.error.retryable = false;
});
var shouldCheckContentLength = false;
var expectedLen;
if (req.httpRequest.method !== 'HEAD') {
expectedLen = parseInt(headers['content-length'], 10);
}
if (expectedLen !== undefined && !isNaN(expectedLen) && expectedLen >= 0) {
shouldCheckContentLength = true;
var receivedLen = 0;
}
var checkContentLengthAndEmit = function checkContentLengthAndEmit() {
if (shouldCheckContentLength && receivedLen !== expectedLen) {
stream.emit('error', AWS.util.error(
new Error('Stream content length mismatch. Received ' +
receivedLen + ' of ' + expectedLen + ' bytes.'),
{ code: 'StreamContentLengthMismatch' }
));
} else if (AWS.HttpClient.streamsApiVersion === 2) {
stream.end();
} else {
stream.emit('end');
}
};
var httpStream = resp.httpResponse.createUnbufferedStream();
if (AWS.HttpClient.streamsApiVersion === 2) {
if (shouldCheckContentLength) {
var lengthAccumulator = new streams.PassThrough();
lengthAccumulator._write = function(chunk) {
if (chunk && chunk.length) {
receivedLen += chunk.length;
}
return streams.PassThrough.prototype._write.apply(this, arguments);
};
lengthAccumulator.on('end', checkContentLengthAndEmit);
stream.on('error', function(err) {
shouldCheckContentLength = false;
httpStream.unpipe(lengthAccumulator);
lengthAccumulator.emit('end');
lengthAccumulator.end();
});
httpStream.pipe(lengthAccumulator).pipe(stream, { end: false });
} else {
httpStream.pipe(stream);
}
} else {
if (shouldCheckContentLength) {
httpStream.on('data', function(arg) {
if (arg && arg.length) {
receivedLen += arg.length;
}
});
}
httpStream.on('data', function(arg) {
stream.emit('data', arg);
});
httpStream.on('end', checkContentLengthAndEmit);
}
httpStream.on('error', function(err) {
shouldCheckContentLength = false;
stream.emit('error', err);
});
}
});
return stream;
},