in fanout.js [46:117]
function postToService(serviceReference, target, records, stats, callback) {
var parallelPosters = target.parallel ? config.parallelPosters : 1;
var errors = [];
var definition = serviceReference.definition;
var service = serviceReference.service;
var limits = definition.limits;
var maxRecords = limits.maxRecords;
var maxSize = limits.maxSize;
var maxUnitSize = limits.maxUnitSize;
var includeKey = limits.includeKey;
var listOverhead = limits.listOverhead;
var recordOverhead = limits.recordOverhead;
var interRecordOverhead = limits.interRecordOverhead;
// Filter invalid records
records = records.filter(function (record) {
var size = record.size + (includeKey ? Buffer.byteLength(record.key) : 0);
if((size + listOverhead + recordOverhead) > maxUnitSize) {
console.error("Record too large to be pushed to target '" + target.id + "' of type '" + target.type + "':\n", JSON.stringify(record));
errors.push(new Error("Record too large, was removed"));
return false;
} else {
return true;
}
});
// Group records per block for sending
var maxRecordsPerBlock = (target.collapse !== null) && (target.collapse != "") && (target.collapse != "none") ? maxRecords : 1;
var blocks = [];
var blockSize = listOverhead;
var block = [];
while(records.length > 0) {
var record = records.shift();
var recordSize = record.size + (includeKey ? record.key.length : 0) + recordOverhead + (block.length > 0 ? interRecordOverhead: 0);
if(((blockSize + recordSize) > maxSize) || (block.length >= maxRecordsPerBlock)) {
// Block full, start a new block
blocks.push(block);
block = [];
blockSize = listOverhead;
}
// Add the record to the records to send
blockSize = blockSize + recordSize;
block.push(record);
}
if(block.length > 0) {
blocks.push(block);
block = [];
}
// Posts the blocks to the target services
var queue = async.queue(function(block, done) {
definition.send(service, target, block.records, done);
}, parallelPosters);
queue.drain = function() {
serviceReference.dispose();
callback((errors.length > 0) ? new Error("An error occured while pushing data to an AWS Service"): null);
};
// Add all targets to the queue
blocks.forEach(function(block) {
queue.push({ records: block }, function(err) {
if(err) {
errors.push(err);
console.error("An error occured while pushing data to target '" + target.id + "' of type '" + target.type + "':", err);
}
});
});
}