function processFinalRecords()

in index.js [602:654]


function processFinalRecords(records, streamName, deliveryStreamName, callback) {
    if (debug) {
        console.log('Delivering records to destination Streams');
    }
    // get the set of batch offsets based on the transformed record sizes
    var batches = exports.getBatchRanges(records);

    if (debug) {
        console.log(JSON.stringify(batches));
    }

    // push to Firehose using PutRecords API at max record count or size.
    // This uses the async reduce method so that records from Kinesis will
    // appear in the Firehose PutRecords request in the same order as they
    // were received by this function
    async.reduce(batches, 0, function (successCount, item, reduceCallback) {
        if (debug) {
            console.log("Forwarding records " + item.lowOffset + ":" + item.highOffset + " - " + item.sizeBytes + " Bytes");
        }

        // grab subset of the records assigned for this batch and push to
        // firehose
        var processRecords = records.slice(item.lowOffset, item.highOffset);

        // decorate the array for the Firehose API
        var decorated = [];
        processRecords.map(function (item) {
            decorated.push({
                Data: item
            });
        });

        exports.writeToFirehose(decorated, streamName, deliveryStreamName, function (err) {
            if (err) {
                reduceCallback(err, successCount);
            } else {
                reduceCallback(null, successCount + 1);
            }
        });
    }, function (err, successfulBatches) {
        if (err) {
            console.log("Forwarding failure after " + successfulBatches + " successful batches");
            callback(err);
        } else {
            console.log("Event forwarding complete. Forwarded " + successfulBatches + " batches comprising " + records.length + " records to Firehose " + deliveryStreamName);
            callback(null, {
                "deliveryStreamName": deliveryStreamName,
                "batchesDelivered": successfulBatches,
                "recordCount": records.length
            });
        }
    });
}