app/nodejs/process.js (107 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. /* process.js - take data in raw_data bucket, process, and store in processed bucket. */ 'use strict'; import logger from './logger.js'; import fs from 'fs'; import os from 'os'; import path from 'path'; import csv from 'csv-parser'; import {Storage} from '@google-cloud/storage'; import config from './config.js'; const storage = new Storage(); /** * Download raw data from Cloud Storage into local file for processing * @return {string} Local filename containing downloaded data */ async function downloadRawData() { logger.info('downloadRawData: start downloading data'); if (!process.env.RAW_DATA_BUCKET) { throw new Error('RAW_DATA_BUCKET required'); } if (!process.env.PROCESSED_DATA_BUCKET) { throw new Error('PROCESSED_DATA_BUCKET required'); } const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'rawData')); const tempDataFile = path.join(tempDir, 'raw_data.csv'); logger.info(`downloadRawData: processing from ${process.env.RAW_DATA_BUCKET} ` + `to ${process.env.PROCESSED_DATA_BUCKET}`); await storage .bucket(process.env.RAW_DATA_BUCKET) .file(process.env.RAW_DATA_FILE || 'squirrels.csv') .download({destination: tempDataFile}); logger.info(`downloadRawData: downloaded data to ${tempDataFile}`); return tempDataFile; } /** * Process local file, producing aggregate data * @param {string} tempDataFile Local filename containing downloaded data * @return {object} Correlated aggregate data */ async function processRawData(tempDataFile) { logger.info('processRawData: start processing data'); return new Promise((resolve) => { const aggregate = {}; let ignoredRecords = 0; let countedRecords = 0; const results = []; fs.createReadStream(tempDataFile) .pipe(csv()) .on('data', (data) => results.push(data)) .on('end', () => { results.forEach(function(row) { let validRow = true; config.FACETS.forEach(function(facet) { if ((row[facet] == '') || (row[facet] == '?')) { ignoredRecords += 1; validRow = false; } }); if (validRow) { // build aggregate identifier const rowKeyValues = []; config.FACETS.forEach(function(facet) { rowKeyValues.push(row[facet]); }); const rowKey = rowKeyValues.join('/'); // Build the base data structure on first interaction if (!aggregate[rowKey]) { aggregate[rowKey] = {'_counter': 0}; config.SEGMENTS.forEach(function(segment) { if (!aggregate[rowKey][segment]) { aggregate[rowKey][segment] = 0; } }); } // Record the relevant data config.SEGMENTS.forEach(function(segment) { if (row[segment] == 'true') { aggregate[rowKey][segment] += 1; } }); // Increment counters aggregate[rowKey]['_counter'] += 1; countedRecords += 1; } }); logger.info(`processRawData: processed ${countedRecords} ` + `records, removed ${ignoredRecords}`); resolve(aggregate); }); }); } /** * Write aggregate data to Cloud Storage * @param {object} aggregate Aggregated data */ function writeProcessedData(aggregate) { logger.info('writeProcessedData: start writing'); let counter = 0; const processedBucket = storage.bucket(process.env.PROCESSED_DATA_BUCKET); const writeData = new Promise((resolve) => { Object.keys(aggregate).forEach(async function(rowKey) { const facetData = aggregate[rowKey]; processedBucket .file(`${rowKey}/data.json`) .save(JSON.stringify(facetData)); counter += 1; }); resolve(counter); }); writeData.then(function(counter) { logger.info(`writeProcessedData: wrote ${counter} files`); }); } const main = async () => { logger.info(`🟢 Start process.py with: ` + `${process.env.RAW_DATA_BUCKET},${process.env.PROCESSED_DATA_BUCKET}`); const dataFile = await downloadRawData(); processRawData(dataFile) .then(function(result) { writeProcessedData(result); }); }; main().catch((err) => { console.error(err); process.exit(1); });