in fanout.js [217:270]
exports.handler = function(event, context) {
var stats = statistics.create();
stats.register('sources', 'Sources', 'counter', 'Count'); // source, destination
stats.register('records', 'Records', 'counter', 'Count'); // source, destination
if (config.debug) {
console.log("Starting process of " + event.Records.length + " events");
}
// Group records per source ARN
var sources = {};
event.Records.forEach(function(record) {
var eventSourceARN = record.eventSourceARN || record.TopicArn;
if(! sources.hasOwnProperty(eventSourceARN)) {
stats.addTick('sources');
stats.register('records#' + eventSourceARN, 'Records', 'counter', 'Count', eventSourceARN);
stats.register('targets#' + eventSourceARN, 'Targets', 'counter', 'Count', eventSourceARN);
sources[eventSourceARN] = { Records: [record] };
} else {
sources[eventSourceARN].Records.push(record);
}
stats.addTick('records#' + eventSourceARN);
});
var eventSourceARNs = Object.keys(sources);
var hasError = false;
var queue = async.queue(function(eventSourceARN, callback) {
async.waterfall([
function(done) { configuration.get(eventSourceARN, services.definitions, done); },
function(targets, done) { fanOut(eventSourceARN, sources[eventSourceARN], context, targets, stats, done); }
],
callback);
});
queue.drain = function() {
stats.publish(function() {
if(hasError) {
context.fail('Some processing errors occured, check logs'); // ERROR with message
} else {
context.succeed("Done processing all subscribers for this event, no errors detected"); // SUCCESS with message
}
});
};
eventSourceARNs.forEach(function(eventSourceARN) {
queue.push(eventSourceARN, function(err) {
if(err) {
console.error("Error while processing events from source '" + eventSourceARN + "'", err);
hasError = true;
}
})
});
};