in aws-blog-lambda-map-reduce-streaming/streams.js [171:267]
LambdaStream.prototype._invoke = function(cb){
var self = this;
var batch = self._batch.slice();
var tail = 'Tail';
var params = {
FunctionName: 'blog_cascade',
InvocationType: 'RequestResponse',
LogType: tail, //None or Tail
Payload: JSON.stringify({
keys: batch,
bucket: self._bucket,
region: self._region,
burst_rate: Math.floor((PROVISIONED_CONCURRENT_LAMBDAS - DEFAULT_MAX_CONCURRENT_LAMBDAS) / DEFAULT_MAX_CONCURRENT_LAMBDAS),
batch_size: PROCESS_BATCH_SIZE,
no_agg: self._noAgg
})
};
self._keyCounter += self._batch.length;
var lambda_id = uuid.v4();
var message = 'Invoking lambda ' + lambda_id +
' with outstanding ' + Object.keys(self._lambdas).length +
' and keys ' + self._keyCounter;
console.log(message);
self.logToClient('Invoking lambda ' + lambda_id);
self._lambdas[lambda_id] = self._lambda.invoke(params, function(err, obj) {
// Mark that this is complete
delete self._lambdas[lambda_id];
console.log('Completed lambda ' + lambda_id +
' with outstanding ' + Object.keys(self._lambdas).length +
' and keys ' + self._keyCounter);
self.logToClient('Lambda ' + lambda_id +
' finished keys ' + JSON.stringify(batch));
if (err){
console.error('Error running Lambda: ' + err);
self.logToClient('Lambda ' + lambda_id + ' error: ' + err +
log.toString());
console.error('Log:', log.toString());
self.emit('error', err);
self._checkDone();
return;
}
if (tail === 'Tail'){
var log = (new Buffer(obj.LogResult, 'base64')).toString();
var matches = log.match(/Billed Duration: (\d+) ms/);
if (matches && matches.length > 1){
self._total_ms_taken += parseInt(matches[1]);
}
}
self._totalKeys += batch.length;
var result = JSON.parse(obj.Payload);
if (result.length && (result[0].length > 0 || result[0] > 0)){
if (self._noAgg){
// For each Cascade
for (var i = 0, len = result.length; i < len; i++){
// For each Wordcount
for (var j = 0, jlen = result[i][0].length; j < jlen; j++){
var perLambdaResult = result[i][0][j];
self.push(JSON.stringify(
[
self._totalKeys,
self._keyCounter,
perLambdaResult[0],
perLambdaResult[1],
perLambdaResult[2]
]
));
}
for (var j = 0, jlen = result[i][1].length; j < jlen; j++){
self.logToClient('WARNING: ' + result[i][1][j]);
}
}
}
else {
self._totalWords += parseInt(result[0]);
self._totalLines += parseInt(result[1]);
self._total_ms_taken += parseInt(result[2]);
self.push(JSON.stringify(
[self._totalKeys, self._keyCounter,
self._totalWords, self._totalLines, self._total_ms_taken]
));
for (var i = 0, len = result[3].length; i < len; i++){
self.logToClient('WARNING: ' + result[3][i]);
}
}
}
else if (result.length && result[0].errorMessage){
self.logToClient('WARNING: ' + result[0].errorMessage);
}
self._checkDone();
});
cb();
};