in transformer.js [58:88]
function transformRecords(serviceName, transformer, userRecords, callback) {
async.map(userRecords, function(userRecord, userRecordCallback) {
var dataItem = serviceName === KINESIS_SERVICE_NAME ? new Buffer(userRecord.data, 'base64').toString(targetEncoding) : userRecord;
transformer.call(undefined, dataItem, function(err, transformed) {
if (err) {
console.log(JSON.stringify(err));
userRecordCallback(err);
} else {
if (transformed && transformed instanceof Buffer) {
// call the map callback with the
// transformed Buffer decorated for use as a
// Firehose batch entry
userRecordCallback(null, transformed);
} else {
// don't know what this transformed
// object is
userRecordCallback("Output of Transformer was malformed. Must be instance of Buffer or routable Object");
}
}
});
}, function(err, transformed) {
// user records have now been transformed, so call
// errors or invoke the transformed record processor
if (err) {
callback(err);
} else {
callback(null, transformed);
}
});
};