in node/example/sample-aggregation.js [63:99]
handleNoProcess(event, function () {
console.log("Processing " + event.Records.length +
" Kinesis Input Records");
var totalUserRecords = 0;
// process each provided Record in the event
async.map(event.Records, function (record, asyncCallback) {
var recordAfterProcessing = doSomething(record.kinesis);
asyncCallback(null, recordAfterProcessing);
}, function (err, mapResults) {
// aggregate the records and call the onReady function for each
// block of prepared messages which are 1MB in size
aggregate(mapResults, (err, encoded) => {
console.log("Encoded records of size " + Buffer.byteLength(encoded) +
" received");
// build putRecords params
const params = {
Data: encodedRecord.data,
PartitionKey: encodedRecord.partitionKey,
StreamName: streamName
}
if (encodedRecord.explicitHashKey) {
params.ExplicitHashKey = encodedRecord.explicitHashKey
}
// send to kinesis
// kinesisClient.putRecord(param , ...)
}, () => {
// aggregation end
finish(event, context, ok, 'Success')
},
(err, data) => {
// error occurs when aggregate records
console.log(`Error ${err}`)
});
});
});