RecordAggregator.prototype.aggregateRecords = function()

in node/lib/kpl-agg.js [366:434]


RecordAggregator.prototype.aggregateRecords = function (records, forceFlush, onReadyCallback) {
	const self = this;
	const _onReadyCallback = onReadyCallback || this.onReadyCallback
	records.forEach(function (record) {

		let messageSize = calculateRecordSize(self, record)

		if (!record.data) {
			return callOnReadyCallback(new Error('Record.Data field is mandatory'), record, _onReadyCallback)
		}
		if (!record.partitionKey) {
			return callOnReadyCallback(new Error('record.partitionKey field is mandatory'), record, _onReadyCallback)
		}

		if (common.debug) {
			console.log("Current Pending Size: " +
				self.putRecords.length + " records, " +
				self.totalBytes + " bytes");
			console.log("Next: " + messageSize + " bytes");
		}

		// if the size of this record would push us over the limit,
		// then encode the current set
		if (messageSize > KINESIS_MAX_PAYLOAD_BYTES) {
			callOnReadyCallback(new Error('Input record (PK=' + record.partitionKey +
				', EHK=' + record.explicitHashKey +
				', SizeBytes=' + messageSize +
				') is too large to fit inside a single Kinesis record.'), null, _onReadyCallback);
		} else if ((self.totalBytes + messageSize) > KINESIS_MAX_PAYLOAD_BYTES) {
			if (common.debug) {
				console.log("calculated totalBytes=" + self.totalBytes);
			}
			callOnReadyCallback(null, self.putRecords, _onReadyCallback);
			self.clearRecords();

			// total size tracked is now the size of the current record
			self.totalBytes = calculateRecordSize(self, record)

			// current inflight becomes just this record
			self.putRecords = [record];
		} else {
			// the current set of records is still within the kinesis
			// max payload size so increment inflight/total bytes
			self.putRecords.push(record);
			self.totalBytes += messageSize;
		}

		if (!self.partitionKeyTable.hasOwnProperty(record.partitionKey)) {
			// add the size of the partition key when encoded
			self.partitionKeyTable[record.partitionKey] = self.partitionKeyCount;
			self.partitionKeyCount += 1;
		}

		if (record.explicitHashKey &&
			!self.explicitHashKeyTable
			.hasOwnProperty(record.explicitHashKey)) {
			// add the size of the explicit hash key when encoded
			self.explicitHashKeyTable[record.explicitHashKey] = self.explicitHashKeyCount;
			self.explicitHashKeyCount += 1;
		}

	});

	if (forceFlush === true && self.putRecords.length > 0) {
		callOnReadyCallback(null, this.putRecords, _onReadyCallback);
		this.clearRecords();
	}

};