in index.js [218:285]
function handler(event, context) {
// add the context and event to the function that closes the lambda
// invocation
var finish = onCompletion.bind(undefined, context, event);
/** End Runtime Functions */
if (debug) {
console.log(JSON.stringify(event));
}
// fail the function if the wrong event source type is being sent, or if
// there is no data, etc
var noProcessStatus = ERROR;
var noProcessReason;
if (!event.Records || event.Records.length === 0) {
noProcessReason = "Event contains no Data";
// not fatal - just got an empty event
noProcessStatus = OK;
} else {
// there are records in this event
var serviceName;
if (event.Records[0].eventSource === KINESIS_SERVICE_NAME || event.Records[0].eventSource === DDB_SERVICE_NAME) {
serviceName = event.Records[0].eventSource;
} else {
noProcessReason = "Invalid Event Source " + event.Records[0].eventSource;
}
// currently hard coded around the 1.0 kinesis event schema
if (event.Records[0].kinesis && event.Records[0].kinesis.kinesisSchemaVersion !== "1.0") {
noProcessReason = "Unsupported Kinesis Event Schema Version " + event.Records[0].kinesis.kinesisSchemaVersion;
}
}
if (noProcessReason) {
// terminate if there were any non process reasons
finish(noProcessStatus, ERROR, noProcessReason);
} else {
init(function (err) {
if (err) {
finish(err, ERROR);
} else {
// parse the stream name out of the event
var streamName = exports.getStreamName(event.Records[0].eventSourceARN);
// create the processor to handle each record
var processor = exports.processEvent.bind(undefined, event, serviceName, streamName, function (err) {
if (err) {
finish(err, ERROR, "Error Processing Records");
} else {
finish(undefined, OK);
}
});
if (deliveryStreamMapping.length === 0 || !deliveryStreamMapping[streamName]) {
// no delivery stream cached so far, so add this stream's
// tag
// value
// to the delivery map, and continue with processEvent
exports.buildDeliveryMap(streamName, serviceName, context, event, processor);
} else {
// delivery stream is cached so just invoke the processor
processor();
}
}
});
}
}