in node/lib/kpl-agg.js [108:171]
function aggregateRecord(records) {
if (common.debug) {
console.log("Protobuf Aggregation of " + records.length + " records");
}
const partitionKeyTable = {};
let partitionKeyCount = 0;
const explicitHashKeyTable = {};
let explicitHashKeyCount = 0;
const putRecords = records.map(function (record) {
// add the partition key and explicit hash key entries
if (!partitionKeyTable.hasOwnProperty(record.partitionKey)) {
partitionKeyTable[record.partitionKey] = partitionKeyCount;
partitionKeyCount += 1;
}
if (record.explicitHashKey &&
!explicitHashKeyTable
.hasOwnProperty(record.explicitHashKey)) {
explicitHashKeyTable[record.explicitHashKey] = explicitHashKeyCount;
explicitHashKeyCount += 1;
}
// add the AggregatedRecord object with partition and hash
// key indexes
return {
"partition_key_index": partitionKeyTable[record.partitionKey],
"explicit_hash_key_index": explicitHashKeyTable[record.explicitHashKey],
data: record.data,
tags: []
};
});
// encode the data
const protoData = common.AggregatedRecord.encode({
"partition_key_table": Object.keys(partitionKeyTable),
"explicit_hash_key_table": Object.keys(explicitHashKeyTable),
"records": putRecords
});
if (common.debug) {
console.log(JSON.stringify({
"partition_key_table": Object.keys(partitionKeyTable),
"explicit_hash_key_table": Object.keys(explicitHashKeyTable),
records: putRecords.map((record) => record.data.toString('base64'))
}));
}
const bufferData = protoData.finish();
// get the md5 for the encoded data
const md5 = crypto.createHash('md5');
md5.update(bufferData);
const checksum = md5.digest();
// create the final object as a concatenation of the magic KPL number,
// the encoded data records, and the md5 checksum
var finalBuffer = Buffer.concat([common.magic, bufferData, checksum]);
if (common.debug) {
console.log("Checksum: " + checksum.toString('base64'));
console.log("actual totalBytes=" + Buffer.byteLength(bufferData, 'binary'));
console.log("final totalBytes=" + Buffer.byteLength(finalBuffer, 'binary'));
}
return finalBuffer;
}