in index.js [602:654]
function processFinalRecords(records, streamName, deliveryStreamName, callback) {
if (debug) {
console.log('Delivering records to destination Streams');
}
// get the set of batch offsets based on the transformed record sizes
var batches = exports.getBatchRanges(records);
if (debug) {
console.log(JSON.stringify(batches));
}
// push to Firehose using PutRecords API at max record count or size.
// This uses the async reduce method so that records from Kinesis will
// appear in the Firehose PutRecords request in the same order as they
// were received by this function
async.reduce(batches, 0, function (successCount, item, reduceCallback) {
if (debug) {
console.log("Forwarding records " + item.lowOffset + ":" + item.highOffset + " - " + item.sizeBytes + " Bytes");
}
// grab subset of the records assigned for this batch and push to
// firehose
var processRecords = records.slice(item.lowOffset, item.highOffset);
// decorate the array for the Firehose API
var decorated = [];
processRecords.map(function (item) {
decorated.push({
Data: item
});
});
exports.writeToFirehose(decorated, streamName, deliveryStreamName, function (err) {
if (err) {
reduceCallback(err, successCount);
} else {
reduceCallback(null, successCount + 1);
}
});
}, function (err, successfulBatches) {
if (err) {
console.log("Forwarding failure after " + successfulBatches + " successful batches");
callback(err);
} else {
console.log("Event forwarding complete. Forwarded " + successfulBatches + " batches comprising " + records.length + " records to Firehose " + deliveryStreamName);
callback(null, {
"deliveryStreamName": deliveryStreamName,
"batchesDelivered": successfulBatches,
"recordCount": records.length
});
}
});
}