in index.js [451:533]
function processEvent(event, serviceName, streamName, callback) {
if (debug) {
console.log('Processing event');
}
// look up the delivery stream name of the mapping cache
var deliveryStreamName = deliveryStreamMapping[streamName];
if (debug) {
console.log("Forwarding " + event.Records.length + " " + serviceName + " records to Delivery Stream " + deliveryStreamName);
}
async.map(event.Records, function (record, recordCallback) {
// resolve the record data based on the service
if (serviceName === KINESIS_SERVICE_NAME) {
// run the record through the KPL deaggregator
deagg.deaggregateSync(record.kinesis, computeChecksums, function (err, userRecords) {
// userRecords now has all the deaggregated user records, or
// just the original record if no KPL aggregation is in use
if (err) {
recordCallback(err);
} else {
recordCallback(null, userRecords);
}
});
} else {
// dynamo update stream record
if (writableEventTypes.includes(record.eventName)) {
if (debug) {
console.log("Processing record: " + JSON.stringify(record) + " with event type: " + record.eventName + " when writable events are: " + writableEventTypes);
}
var data = exports.createDynamoDataItem(record);
recordCallback(null, data);
} else {
if (debug) {
console.log("Skipping record: " + JSON.stringify(record) + " with event type: " + record.eventName + " when writable events are: " + writableEventTypes);
}
recordCallback(null, null);
}
}
}, function (err, extractedUserRecords) {
if (err) {
callback(err);
} else {
// extractedUserRecords will be array[array[Object]], so
// flatten to array[Object]
var userRecords = [].concat.apply([], extractedUserRecords.filter(record => record !== null));
// transform the user records
transform.transformRecords(serviceName, useTransformer, userRecords, function (err, transformed) {
// apply the routing function that has been configured
router.routeToDestination(deliveryStreamName, transformed, useRouter, function (err, routingDestinationMap) {
if (err) {
// we are still going to route to the default stream
// here, as a bug in routing implementation cannot
// result in lost data!
console.log(err);
// discard the delivery map we might have received
routingDestinationMap[deliveryStreamName] = transformed;
}
// send the routed records to the delivery processor
async.map(Object.keys(routingDestinationMap), function (destinationStream, asyncCallback) {
var records = routingDestinationMap[destinationStream];
processFinalRecords(records, streamName, destinationStream, asyncCallback);
}, function (err, results) {
if (err) {
callback(err);
} else {
if (debug) {
results.map(function (item) {
console.log(JSON.stringify(item));
});
}
callback();
}
});
});
});
}
});
}