function sendMessages()

in fanout.js [130:174]


function sendMessages(eventSourceARN, target, event, stats, callback) {
  if(config.debug) {
    console.log("Processing target '" + target.id + "'");
  }

  var start = Date.now();
  stats.addTick('targets#' + eventSourceARN);
  stats.register('records#' + eventSourceARN + '#' + target.destination, 'Records', 'stats', 'Count', eventSourceARN, target.destination);
  stats.addValue('records#' + eventSourceARN + '#' + target.destination, event.Records.length);

  async.waterfall([
      function(done) { services.get(target, done); },
      function(serviceReference, done) { 
        var definition = serviceReference.definition;
        if(definition.intercept) {
          if(target.passthrough) {
            transformation.transformRecords(event.Records, target, function(err, transformedRecords) {
              transformedRecords.forEach(function(record) { record.data = record.data.toString('base64') });
              interceptService(serviceReference, target, { Records: transformedRecords }, stats, done);
            });
          } else {
            interceptService(serviceReference, target, event, stats, done);
          }
        } else if (definition.send) {
          transformation.transformRecords(event.Records, target, function(err, transformedRecords) {
            postToService(serviceReference, target, transformedRecords, stats, done);
          });
        } else {
          done(new Error("Invalid module '" + target.type + "', it must export either an 'intercept' or a 'send' method"));
        }
      }
    ], function(err) {
      if(err) {
        console.error("Error while processing target '" + target.id + "': " + err);
        callback(new Error("Error while processing target '" + target.id + "': " + err));
        return;
      }
      var end = Date.now();
      var duration = Math.floor((end - start) / 10) / 100;
      if(config.debug) {
        console.log("Target '" + target.id + "' for source '" + eventSourceARN + "' successfully processed in" , duration, "seconds with", event.Records.length,"records");
      }
      callback();
    });
}