in fanout.js [130:174]
function sendMessages(eventSourceARN, target, event, stats, callback) {
if(config.debug) {
console.log("Processing target '" + target.id + "'");
}
var start = Date.now();
stats.addTick('targets#' + eventSourceARN);
stats.register('records#' + eventSourceARN + '#' + target.destination, 'Records', 'stats', 'Count', eventSourceARN, target.destination);
stats.addValue('records#' + eventSourceARN + '#' + target.destination, event.Records.length);
async.waterfall([
function(done) { services.get(target, done); },
function(serviceReference, done) {
var definition = serviceReference.definition;
if(definition.intercept) {
if(target.passthrough) {
transformation.transformRecords(event.Records, target, function(err, transformedRecords) {
transformedRecords.forEach(function(record) { record.data = record.data.toString('base64') });
interceptService(serviceReference, target, { Records: transformedRecords }, stats, done);
});
} else {
interceptService(serviceReference, target, event, stats, done);
}
} else if (definition.send) {
transformation.transformRecords(event.Records, target, function(err, transformedRecords) {
postToService(serviceReference, target, transformedRecords, stats, done);
});
} else {
done(new Error("Invalid module '" + target.type + "', it must export either an 'intercept' or a 'send' method"));
}
}
], function(err) {
if(err) {
console.error("Error while processing target '" + target.id + "': " + err);
callback(new Error("Error while processing target '" + target.id + "': " + err));
return;
}
var end = Date.now();
var duration = Math.floor((end - start) / 10) / 100;
if(config.debug) {
console.log("Target '" + target.id + "' for source '" + eventSourceARN + "' successfully processed in" , duration, "seconds with", event.Records.length,"records");
}
callback();
});
}