in node/example/sample-deaggregation.js [64:113]
handleNoProcess(event, function() {
var realRecords = [];
console.log("Processing " + event.Records.length + " Kinesis Input Records");
// process all records in parallel
async.map(event.Records, function(record, asyncCallback) {
// use the async deaggregate interface. the per-record callback
// appends the records to an array, and the after record callback
// calls the async callback to mark the kinesis record as completed
// within
// the async map operation
deagg.deaggregate(record.kinesis, computeChecksums, function(err, userRecord) {
if (err) {
console.log("Error on Record: " + err);
asyncCallback(err);
} else {
var recordData = new Buffer(userRecord.data, 'base64');
console.log("Per Record Callback Invoked with Record: " + recordData.toString('ascii'));
realRecords.push(userRecord);
// you can do something else with each kinesis user
// record here!
}
}, function(err) {
if (err) {
console.log(err);
}
// call the async callback to reflect that the kinesis message
// is completed
asyncCallback(err);
});
}, function(err, results) {
// function is called once all kinesis records have been processed
// by async.map
if (debug) {
console.log("Kinesis Record Processing Completed");
console.log("Processed " + realRecords.length + " Kinesis User Records");
}
if (err) {
finish(event, context, error, err);
} else {
finish(event, context, ok, "Success");
}
});
});