function processEvent()

in index.js [451:533]


function processEvent(event, serviceName, streamName, callback) {
    if (debug) {
        console.log('Processing event');
    }
    // look up the delivery stream name of the mapping cache
    var deliveryStreamName = deliveryStreamMapping[streamName];

    if (debug) {
        console.log("Forwarding " + event.Records.length + " " + serviceName + " records to Delivery Stream " + deliveryStreamName);
    }

    async.map(event.Records, function (record, recordCallback) {
        // resolve the record data based on the service
        if (serviceName === KINESIS_SERVICE_NAME) {
            // run the record through the KPL deaggregator
            deagg.deaggregateSync(record.kinesis, computeChecksums, function (err, userRecords) {
                // userRecords now has all the deaggregated user records, or
                // just the original record if no KPL aggregation is in use
                if (err) {
                    recordCallback(err);
                } else {
                    recordCallback(null, userRecords);
                }
            });
        } else {
            // dynamo update stream record
            if (writableEventTypes.includes(record.eventName)) {
                if (debug) {
                    console.log("Processing record: " + JSON.stringify(record) + " with event type: " + record.eventName + " when writable events are: " + writableEventTypes);
                }
                var data = exports.createDynamoDataItem(record);
                recordCallback(null, data);
            } else {
                if (debug) {
                    console.log("Skipping record: " + JSON.stringify(record) + " with event type: " + record.eventName + " when writable events are: " + writableEventTypes);
                }
                recordCallback(null, null);
            }
        }
    }, function (err, extractedUserRecords) {
        if (err) {
            callback(err);
        } else {
            // extractedUserRecords will be array[array[Object]], so
            // flatten to array[Object]
            var userRecords = [].concat.apply([], extractedUserRecords.filter(record => record !== null));

            // transform the user records
            transform.transformRecords(serviceName, useTransformer, userRecords, function (err, transformed) {
                // apply the routing function that has been configured
                router.routeToDestination(deliveryStreamName, transformed, useRouter, function (err, routingDestinationMap) {
                    if (err) {
                        // we are still going to route to the default stream
                        // here, as a bug in routing implementation cannot
                        // result in lost data!
                        console.log(err);

                        // discard the delivery map we might have received
                        routingDestinationMap[deliveryStreamName] = transformed;
                    }
                    // send the routed records to the delivery processor
                    async.map(Object.keys(routingDestinationMap), function (destinationStream, asyncCallback) {
                        var records = routingDestinationMap[destinationStream];

                        processFinalRecords(records, streamName, destinationStream, asyncCallback);
                    }, function (err, results) {
                        if (err) {
                            callback(err);
                        } else {
                            if (debug) {
                                results.map(function (item) {
                                    console.log(JSON.stringify(item));
                                });
                            }
                            callback();
                        }
                    });

                });
            });
        }
    });
}