in node/lib/kpl-agg.js [366:434]
RecordAggregator.prototype.aggregateRecords = function (records, forceFlush, onReadyCallback) {
const self = this;
const _onReadyCallback = onReadyCallback || this.onReadyCallback
records.forEach(function (record) {
let messageSize = calculateRecordSize(self, record)
if (!record.data) {
return callOnReadyCallback(new Error('Record.Data field is mandatory'), record, _onReadyCallback)
}
if (!record.partitionKey) {
return callOnReadyCallback(new Error('record.partitionKey field is mandatory'), record, _onReadyCallback)
}
if (common.debug) {
console.log("Current Pending Size: " +
self.putRecords.length + " records, " +
self.totalBytes + " bytes");
console.log("Next: " + messageSize + " bytes");
}
// if the size of this record would push us over the limit,
// then encode the current set
if (messageSize > KINESIS_MAX_PAYLOAD_BYTES) {
callOnReadyCallback(new Error('Input record (PK=' + record.partitionKey +
', EHK=' + record.explicitHashKey +
', SizeBytes=' + messageSize +
') is too large to fit inside a single Kinesis record.'), null, _onReadyCallback);
} else if ((self.totalBytes + messageSize) > KINESIS_MAX_PAYLOAD_BYTES) {
if (common.debug) {
console.log("calculated totalBytes=" + self.totalBytes);
}
callOnReadyCallback(null, self.putRecords, _onReadyCallback);
self.clearRecords();
// total size tracked is now the size of the current record
self.totalBytes = calculateRecordSize(self, record)
// current inflight becomes just this record
self.putRecords = [record];
} else {
// the current set of records is still within the kinesis
// max payload size so increment inflight/total bytes
self.putRecords.push(record);
self.totalBytes += messageSize;
}
if (!self.partitionKeyTable.hasOwnProperty(record.partitionKey)) {
// add the size of the partition key when encoded
self.partitionKeyTable[record.partitionKey] = self.partitionKeyCount;
self.partitionKeyCount += 1;
}
if (record.explicitHashKey &&
!self.explicitHashKeyTable
.hasOwnProperty(record.explicitHashKey)) {
// add the size of the explicit hash key when encoded
self.explicitHashKeyTable[record.explicitHashKey] = self.explicitHashKeyCount;
self.explicitHashKeyCount += 1;
}
});
if (forceFlush === true && self.putRecords.length > 0) {
callOnReadyCallback(null, this.putRecords, _onReadyCallback);
this.clearRecords();
}
};