function postToService()

in fanout.js [46:117]


function postToService(serviceReference, target, records, stats, callback) {
	var parallelPosters = target.parallel ? config.parallelPosters : 1;
	var errors = [];
  var definition = serviceReference.definition;
  var service = serviceReference.service;
  var limits = definition.limits;

	var maxRecords = limits.maxRecords;
	var maxSize = limits.maxSize;
	var maxUnitSize = limits.maxUnitSize;
  var includeKey = limits.includeKey;
	var listOverhead = limits.listOverhead;
	var recordOverhead = limits.recordOverhead;
	var interRecordOverhead = limits.interRecordOverhead;

  // Filter invalid records
	records = records.filter(function (record) {
    var size = record.size + (includeKey ? Buffer.byteLength(record.key) : 0);
		if((size + listOverhead + recordOverhead) > maxUnitSize) {
			console.error("Record too large to be pushed to target '" + target.id + "' of type '" + target.type + "':\n", JSON.stringify(record));
			errors.push(new Error("Record too large, was removed"));
			return false;
		} else {
			return true;
		}
	});

	// Group records per block for sending
	var maxRecordsPerBlock = (target.collapse !== null) && (target.collapse != "") && (target.collapse != "none") ? maxRecords : 1;
	var blocks = [];
	var blockSize = listOverhead;
	var block = [];
	while(records.length > 0) {
		var record = records.shift();
		var recordSize = record.size + (includeKey ? record.key.length : 0) + recordOverhead + (block.length > 0 ? interRecordOverhead: 0);

		if(((blockSize + recordSize) > maxSize) || (block.length >= maxRecordsPerBlock)) {
			// Block full, start a new block
			blocks.push(block);
			block = [];
			blockSize = listOverhead;
		}

		// Add the record to the records to send
		blockSize = blockSize + recordSize;
		block.push(record);
	}
	if(block.length > 0) {
		blocks.push(block);
		block = [];
	}

	// Posts the blocks to the target services
  var queue = async.queue(function(block, done) {
    definition.send(service, target, block.records, done);
  }, parallelPosters);

  queue.drain = function() {
    serviceReference.dispose();
    callback((errors.length > 0) ? new Error("An error occured while pushing data to an AWS Service"): null);
  };

  // Add all targets to the queue
  blocks.forEach(function(block) {
    queue.push({ records: block }, function(err) {
      if(err) {
        errors.push(err);
        console.error("An error occured while pushing data to target '" + target.id + "' of type '" + target.type + "':", err);
      }
    });
  });
}