in app/nodejs/process.js [68:128]
async function processRawData(tempDataFile) {
logger.info('processRawData: start processing data');
return new Promise((resolve) => {
const aggregate = {};
let ignoredRecords = 0;
let countedRecords = 0;
const results = [];
fs.createReadStream(tempDataFile)
.pipe(csv())
.on('data', (data) => results.push(data))
.on('end', () => {
results.forEach(function(row) {
let validRow = true;
config.FACETS.forEach(function(facet) {
if ((row[facet] == '') || (row[facet] == '?')) {
ignoredRecords += 1;
validRow = false;
}
});
if (validRow) {
// build aggregate identifier
const rowKeyValues = [];
config.FACETS.forEach(function(facet) {
rowKeyValues.push(row[facet]);
});
const rowKey = rowKeyValues.join('/');
// Build the base data structure on first interaction
if (!aggregate[rowKey]) {
aggregate[rowKey] = {'_counter': 0};
config.SEGMENTS.forEach(function(segment) {
if (!aggregate[rowKey][segment]) {
aggregate[rowKey][segment] = 0;
}
});
}
// Record the relevant data
config.SEGMENTS.forEach(function(segment) {
if (row[segment] == 'true') {
aggregate[rowKey][segment] += 1;
}
});
// Increment counters
aggregate[rowKey]['_counter'] += 1;
countedRecords += 1;
}
});
logger.info(`processRawData: processed ${countedRecords} ` +
`records, removed ${ignoredRecords}`);
resolve(aggregate);
});
});
}