in aws-blog-lambda-map-reduce-streaming/wordcount.js [25:86]
var countWords = function(srcKey, cb){
try {
var lineCount = 0;
var totalWords = 0;
var finished = false;
var warning = '';
// Create our processor that splits lines/words
var lineSplitter = es.split();
lineSplitter.on('data', function(line){
lineCount++;
totalWords += line.split(/\W/).length;
// Check our execution time
var timeDiff = process.hrtime(startTime);
if (timeDiff[0] >= MAX_EXEC_TIME){
var msg = 'Hit max execution time of ' + MAX_EXEC_TIME +
' seconds, sending incomplete results.';
console.error(msg);
warning = msg;
this.end();
}
})
.on('error', function(err){
console.error('Error in lineSplitter: ' + err);
})
.on('end', function(err){
if (finished) return;
if (err){
console.error(err);
cb(err, null);
return;
}
console.log('Finished ' + srcKey + ' with ' +
totalWords + ' in ' + lineCount + ' lines');
cb(null, [srcKey, totalWords, lineCount, warning]);
finished = true;
});
console.log('Downloading key ' + srcKey);
// Create our pipeline
if (srcKey.match(/\.gz/)){
s3.getObject({
Bucket: srcBucket,
Key: srcKey
})
.createReadStream()
.pipe(zlib.createGunzip())
.pipe(lineSplitter);
}
else {
s3.getObject({
Bucket: srcBucket,
Key: srcKey
})
.createReadStream()
.pipe(lineSplitter);
}
} catch (e) {
console.error(e);
cb(e, null);
}
};