exports.handler = function()

in fanout.js [217:270]


exports.handler = function(event, context) {
  var stats = statistics.create();
  stats.register('sources', 'Sources', 'counter', 'Count'); // source, destination
  stats.register('records', 'Records', 'counter', 'Count'); // source, destination

  if (config.debug) {
    console.log("Starting process of " + event.Records.length + " events");
  }

  // Group records per source ARN
  var sources = {};
  event.Records.forEach(function(record) {
    var eventSourceARN = record.eventSourceARN || record.TopicArn;
    if(! sources.hasOwnProperty(eventSourceARN)) {
      stats.addTick('sources');
      stats.register('records#' + eventSourceARN, 'Records', 'counter', 'Count', eventSourceARN);
      stats.register('targets#' + eventSourceARN, 'Targets', 'counter', 'Count', eventSourceARN);
      sources[eventSourceARN] = { Records: [record] };
    } else {
      sources[eventSourceARN].Records.push(record);
    }
    stats.addTick('records#' + eventSourceARN);
  });

  var eventSourceARNs = Object.keys(sources);
  var hasError = false;

  var queue = async.queue(function(eventSourceARN, callback) {
    async.waterfall([
        function(done) {  configuration.get(eventSourceARN, services.definitions, done); },
        function(targets, done) {  fanOut(eventSourceARN, sources[eventSourceARN], context, targets, stats, done); }
      ],
      callback);
  });

  queue.drain = function() {
    stats.publish(function() {
      if(hasError) {
        context.fail('Some processing errors occured, check logs'); // ERROR with message
      } else {
        context.succeed("Done processing all subscribers for this event, no errors detected"); // SUCCESS with message
      }
    });
  };

  eventSourceARNs.forEach(function(eventSourceARN) {
    queue.push(eventSourceARN, function(err) {
      if(err) {
        console.error("Error while processing events from source '" + eventSourceARN + "'", err);
        hasError = true;
      }
    })
  });
};