in node/example/sample-deaggregation.js [125:167]
handleNoProcess(event, function() {
console.log("Processing " + event.Records.length + " Kinesis Input Records");
var totalUserRecords = 0;
async.map(event.Records, function(record, asyncCallback) {
// use the deaggregateSync interface which receives a single
// callback with an error and an array of Kinesis Records
deagg.deaggregateSync(record.kinesis, computeChecksums, function(err, userRecords) {
if (err) {
console.log(err);
asyncCallback(err);
} else {
console.log("Received " + userRecords.length + " Kinesis User Records");
totalUserRecords += userRecords.length;
userRecords.map(function(record) {
var recordData = new Buffer(record.data, 'base64');
console.log("Kinesis Aggregated User Record:" + recordData.toString('ascii'));
// you can do something else with each kinesis
// user record here!
});
// call the async callback to reflect that the kinesis
// message is completed
asyncCallback(err);
}
});
}, function(err, results) {
// function is called once all kinesis records have been processed
// by async.map
if (debug) {
console.log("Completed processing " + totalUserRecords + " Kinesis User Records");
}
if (err) {
finish(event, context, error, err);
} else {
finish(event, context, ok, "Success");
}
});
});