async function processRawData()

in app/nodejs/process.js [68:128]


async function processRawData(tempDataFile) {
  logger.info('processRawData: start processing data');

  return new Promise((resolve) => {
    const aggregate = {};
    let ignoredRecords = 0;
    let countedRecords = 0;
    const results = [];

    fs.createReadStream(tempDataFile)
        .pipe(csv())
        .on('data', (data) => results.push(data))
        .on('end', () => {
          results.forEach(function(row) {
            let validRow = true;

            config.FACETS.forEach(function(facet) {
              if ((row[facet] == '') || (row[facet] == '?')) {
                ignoredRecords += 1;
                validRow = false;
              }
            });

            if (validRow) {
            // build aggregate identifier
              const rowKeyValues = [];
              config.FACETS.forEach(function(facet) {
                rowKeyValues.push(row[facet]);
              });
              const rowKey = rowKeyValues.join('/');

              // Build the base data structure on first interaction
              if (!aggregate[rowKey]) {
                aggregate[rowKey] = {'_counter': 0};
                config.SEGMENTS.forEach(function(segment) {
                  if (!aggregate[rowKey][segment]) {
                    aggregate[rowKey][segment] = 0;
                  }
                });
              }

              // Record the relevant data
              config.SEGMENTS.forEach(function(segment) {
                if (row[segment] == 'true') {
                  aggregate[rowKey][segment] += 1;
                }
              });

              // Increment counters
              aggregate[rowKey]['_counter'] += 1;
              countedRecords += 1;
            }
          });

          logger.info(`processRawData: processed ${countedRecords} ` +
          `records, removed ${ignoredRecords}`);

          resolve(aggregate);
        });
  });
}