LambdaStream.prototype._invoke = function()

in aws-blog-lambda-map-reduce-streaming/streams.js [171:267]


LambdaStream.prototype._invoke = function(cb){
  var self = this;
  
  var batch = self._batch.slice();
  var tail = 'Tail';
  var params = {
    FunctionName: 'blog_cascade',
    InvocationType: 'RequestResponse',
    LogType: tail, //None or Tail
    Payload: JSON.stringify({
      keys: batch,
      bucket: self._bucket,
      region: self._region,
      burst_rate: Math.floor((PROVISIONED_CONCURRENT_LAMBDAS - DEFAULT_MAX_CONCURRENT_LAMBDAS) / DEFAULT_MAX_CONCURRENT_LAMBDAS),
      batch_size: PROCESS_BATCH_SIZE,
      no_agg: self._noAgg
    })
  };
  
  self._keyCounter += self._batch.length;
  var lambda_id = uuid.v4();

  var message = 'Invoking lambda ' + lambda_id +
    ' with outstanding ' + Object.keys(self._lambdas).length +
    ' and keys ' + self._keyCounter;
  console.log(message);
  self.logToClient('Invoking lambda ' + lambda_id);
  self._lambdas[lambda_id] = self._lambda.invoke(params, function(err, obj) {
    // Mark that this is complete
    delete self._lambdas[lambda_id];
    console.log('Completed lambda ' + lambda_id +
      ' with outstanding ' + Object.keys(self._lambdas).length +
      ' and keys ' + self._keyCounter);
    self.logToClient('Lambda ' + lambda_id +
      ' finished keys ' + JSON.stringify(batch));
    if (err){
      console.error('Error running Lambda: ' + err);
      self.logToClient('Lambda ' + lambda_id + ' error: ' + err +
        log.toString());
      console.error('Log:', log.toString());
      self.emit('error', err);
      self._checkDone();
      return;
    }

    if (tail === 'Tail'){
      var log = (new Buffer(obj.LogResult, 'base64')).toString();
      var matches = log.match(/Billed Duration: (\d+) ms/);
      if (matches && matches.length > 1){
        self._total_ms_taken += parseInt(matches[1]);
      }
    }
    
    self._totalKeys += batch.length;
    var result = JSON.parse(obj.Payload);
    if (result.length && (result[0].length > 0 || result[0] > 0)){
      if (self._noAgg){
        // For each Cascade
        for (var i = 0, len = result.length; i < len; i++){
          // For each Wordcount
          for (var j = 0, jlen = result[i][0].length; j < jlen; j++){
            var perLambdaResult = result[i][0][j];
            self.push(JSON.stringify(
              [ 
                self._totalKeys, 
                self._keyCounter, 
                perLambdaResult[0], 
                perLambdaResult[1],
                perLambdaResult[2]
              ]
            ));
          }
          for (var j = 0, jlen = result[i][1].length; j < jlen; j++){
            self.logToClient('WARNING: ' + result[i][1][j]);
          }
        }
      }
      else {
        self._totalWords += parseInt(result[0]);
        self._totalLines += parseInt(result[1]);
        self._total_ms_taken += parseInt(result[2]);
        self.push(JSON.stringify(
          [self._totalKeys, self._keyCounter, 
            self._totalWords, self._totalLines, self._total_ms_taken]
        ));
        for (var i = 0, len = result[3].length; i < len; i++){
          self.logToClient('WARNING: ' + result[3][i]);
        }
      }
    }
    else if (result.length && result[0].errorMessage){
      self.logToClient('WARNING: ' + result[0].errorMessage);
    }
    self._checkDone();
  });
  cb();
};