createReadStream: function createReadStream()

in src/aws-client.js [10071:10176]


    createReadStream: function createReadStream() {
      var streams = AWS.util.stream;
      var req = this;
      var stream = null;
  
      if (AWS.HttpClient.streamsApiVersion === 2) {
        stream = new streams.PassThrough();
        process.nextTick(function() { req.send(); });
      } else {
        stream = new streams.Stream();
        stream.readable = true;
  
        stream.sent = false;
        stream.on('newListener', function(event) {
          if (!stream.sent && event === 'data') {
            stream.sent = true;
            process.nextTick(function() { req.send(); });
          }
        });
      }
  
      this.on('error', function(err) {
        stream.emit('error', err);
      });
  
      this.on('httpHeaders', function streamHeaders(statusCode, headers, resp) {
        if (statusCode < 300) {
          req.removeListener('httpData', AWS.EventListeners.Core.HTTP_DATA);
          req.removeListener('httpError', AWS.EventListeners.Core.HTTP_ERROR);
          req.on('httpError', function streamHttpError(error) {
            resp.error = error;
            resp.error.retryable = false;
          });
  
          var shouldCheckContentLength = false;
          var expectedLen;
          if (req.httpRequest.method !== 'HEAD') {
            expectedLen = parseInt(headers['content-length'], 10);
          }
          if (expectedLen !== undefined && !isNaN(expectedLen) && expectedLen >= 0) {
            shouldCheckContentLength = true;
            var receivedLen = 0;
          }
  
          var checkContentLengthAndEmit = function checkContentLengthAndEmit() {
            if (shouldCheckContentLength && receivedLen !== expectedLen) {
              stream.emit('error', AWS.util.error(
                new Error('Stream content length mismatch. Received ' +
                  receivedLen + ' of ' + expectedLen + ' bytes.'),
                { code: 'StreamContentLengthMismatch' }
              ));
            } else if (AWS.HttpClient.streamsApiVersion === 2) {
              stream.end();
            } else {
              stream.emit('end');
            }
          };
  
          var httpStream = resp.httpResponse.createUnbufferedStream();
  
          if (AWS.HttpClient.streamsApiVersion === 2) {
            if (shouldCheckContentLength) {
              var lengthAccumulator = new streams.PassThrough();
              lengthAccumulator._write = function(chunk) {
                if (chunk && chunk.length) {
                  receivedLen += chunk.length;
                }
                return streams.PassThrough.prototype._write.apply(this, arguments);
              };
  
              lengthAccumulator.on('end', checkContentLengthAndEmit);
              stream.on('error', function(err) {
                shouldCheckContentLength = false;
                httpStream.unpipe(lengthAccumulator);
                lengthAccumulator.emit('end');
                lengthAccumulator.end();
              });
              httpStream.pipe(lengthAccumulator).pipe(stream, { end: false });
            } else {
              httpStream.pipe(stream);
            }
          } else {
  
            if (shouldCheckContentLength) {
              httpStream.on('data', function(arg) {
                if (arg && arg.length) {
                  receivedLen += arg.length;
                }
              });
            }
  
            httpStream.on('data', function(arg) {
              stream.emit('data', arg);
            });
            httpStream.on('end', checkContentLengthAndEmit);
          }
  
          httpStream.on('error', function(err) {
            shouldCheckContentLength = false;
            stream.emit('error', err);
          });
        }
      });
  
      return stream;
    },