ingestion/batch/index.js (208 lines of code) (raw):

/** * Copyright 2019 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ 'use strict'; const { BigQueryUtil, CloudFunctionUtil, StorageUtil } = require('cds-shared'); const configManager = require('./configurationManager'); const bigqueryUtil = new BigQueryUtil(); const cloudFunctionUtil = new CloudFunctionUtil(); const storageUtil = new StorageUtil(); const stagingTableExpiryDays = 2; const cfg = require('./config'); const batchIdColumnName = `${cfg.pathPrefix}_batch_id`; const labelName = "datashare_managed"; const queryResultTimeoutMs = 540000; let batchId; const archiveEnabled = process.env.ARCHIVE_FILES ? (process.env.ARCHIVE_FILES.toLowerCase() === "true") : false; /** * @param {} event * @param {} context */ exports.processEvent = async (event, context) => { console.log(`Event type: ${context.eventType}`); await processTriggerEvent(event, context); }; /** * @param {} request * @param {} response */ exports.processHttpEvent = async (request, response) => { await processHttpEvent(request, response); }; /** * @param {} event * @param {} context * For Cloud Storage finalize trigger. */ async function processTriggerEvent(event, context) { const options = { eventId: context.eventId, bucketName: event.bucket, fileName: event.name }; const result = await configManager.validateOptions(options, true); if (result.hasException) { throw new Error(`Validation error for fileName: ${options.fileName}: ${JSON.stringify(result)}`); } else if (result.isValid) { await processFile(options, true); } } /** * @param {} request * @param {} response * For local debugging. */ async function processHttpEvent(request, response) { const options = request.body || {}; const result = await configManager.validateOptions(options, true); if (!result.isValid || result.hasException) { response.status(400).send({ errors: result.errors }); return; } const status = await processFile(options, false); const statusCode = (status === true) ? 200 : 400; response.status(statusCode).send(); return; } /** * @param {} options */ async function processFile(options, throws) { batchId = cloudFunctionUtil.generateBatchId(options.eventId, options.bucketName, options.fileName); console.log(`processFile called for ${getBucketName(options)}, batchId is ${batchId}`); const config = await configManager.getConfiguration(options); const haveDataset = await bigqueryUtil.datasetExists(config.datasetId); if (!haveDataset) { console.log(`Dataset ${config.datasetId} not found, creating...`); const options = { labels: {} }; options.labels[labelName] = "true"; await bigqueryUtil.createDataset(config.datasetId, options); console.log(`Created dataset ${config.datasetId}`); } else { console.log(`Found dataset ${config.datasetId}`); } let success = false; let ex; try { await stageFile(config); await transform(config); if (archiveEnabled === true) { await storageUtil.moveFile(options.bucketName, config.sourceFile, config.bucketPath.archive); console.log(`File '${config.sourceFile}' has been archived to: ${config.bucketPath.archive}`); } success = true; } catch (reason) { ex = `Exception processing ${options.fileName}: ${reason}`; console.error(ex); } finally { await bigqueryUtil.deleteTable(config.datasetId, config.stagingTable, true); } if (throws && !success) { throw ex; } return success; } /** * @param {} config * Executes the SQL transformation. */ async function transform(config) { const transformExists = await storageUtil.fileExists(config.bucket, config.bucketPath.transform); let transformQuery = "*"; if (transformExists === true) { let transformContent = await storageUtil.fetchFileContent(config.bucket, config.bucketPath.transform); if (transformContent && transformContent.trim() !== '') { transformQuery = transformContent; } } const query = `SELECT ${transformQuery}, '${batchId}' AS ${batchIdColumnName} FROM \`${config.datasetId}.${config.stagingTable}\``; console.log(`Executing transform query: ${query}`); const [job] = await createTransformJob(config, query); await job.getQueryResults({ maxApiCalls: 1, maxResults: 0, timeoutMs: queryResultTimeoutMs }); console.log('Setting table label'); // Label the table for managing and tracking await bigqueryUtil.setTableLabel(config.datasetId, config.destinationTableId, labelName, "true"); console.log('Setting table label done'); console.log(`Transform job: ${job.metadata.id} ${job.metadata.statistics.query.statementType} ${job.metadata.configuration.jobType} ${job.metadata.status.state}`); return; } /** * @param {} config * Loads data into BQ staging table. */ async function stageFile(config) { console.log(`Using config ${JSON.stringify(config)}`); const dataset = bigqueryUtil.getDataset(config.datasetId); let today = new Date(); today.setDate(today.getDate() + stagingTableExpiryDays); const expiryTime = today.getTime(); console.log(`Setting expirationTime for staging table to ${expiryTime}`); const fields = (config.metadata && config.metadata.fields) || undefined; let options = { expirationTime: expiryTime }; if (fields) { options.schema = fields; } else { options.autodetect = true; } await dataset.createTable(config.stagingTable, options); const table = dataset.table(config.stagingTable); console.log(`Created table ${config.stagingTable}`); console.log(`Executing load for ${config.sourceFile} with config: ${JSON.stringify(config)}`); try { let [job] = await table.load(storageUtil.getBucket(config.bucket).file(config.sourceFile), config.metadata || { autodetect: true }); console.log(`${job.id} ${job.configuration.jobType} ${job.status.state} ${job.statistics.load.outputRows} rows`); return; } catch (ex) { console.error(`Errors encountered loading ${config.sourceFile} to ${config.stagingTable}`); logException(ex); throw (ex); } } /** * @param {} config * @param {} query * Creates query job for the transformation query. */ async function createTransformJob(config, query) { console.log(`Configuration for runTransform: ${JSON.stringify(config)}`); // process.env.GCP_PROJECT is currently used by unit tests // This var is not supported in cloud function node.js 10 + environments // https://cloud.google.com/functions/docs/env-var#nodejs_10_and_subsequent_runtimes let projectId = null; if (!process.env.GCP_PROJECT) { const gcpMetadata = require('gcp-metadata'); const isAvailable = await gcpMetadata.isAvailable(); if (isAvailable === true) { projectId = await gcpMetadata.project('project-id'); console.log(`Project Id is: ${projectId}`); // ...Project ID of the running instance } else { console.log('gcpMetadata is unavailable, unable to determine projectId'); throw new Error('Unable to determine GCP Project Id'); } } else { projectId = process.env.GCP_PROJECT; } let options = { destinationTable: { projectId: projectId, datasetId: config.datasetId, tableId: config.destinationTableId }, createDisposition: "CREATE_IF_NEEDED", writeDisposition: (config.truncate) ? "WRITE_TRUNCATE" : "WRITE_APPEND", query: query, jobPrefix: `${cfg.pathPrefix}_`, timePartitioning: { type: 'DAY' } }; if (config.metadata && config.metadata.location) { options.location = config.metadata.location; } console.log(`BigQuery options: ${JSON.stringify(options)}`); try { return bigqueryUtil.createQueryJob(options); } catch (exception) { console.error(`Exception encountered running transform: ${getExceptionString(exception)}`); logException(exception); throw (exception); } } /** * @param {} exception */ function logException(exception) { const errors = exception.errors; if (errors && errors.length > 0) { for (let i = 0; i < errors.length; i++) { console.error('ERROR ' + (i + 1) + ": " + JSON.stringify(errors[i].message)); } } else { console.error(`Exception thrown, but no error array was given: ${exception}`); } } /** * @param {} options */ function getBucketName(options) { return `gs://${options.bucketName}/${options.fileName}`; } /** * @param {} exception * Returns exception message in string format. Attempts to stringify JSON, if that's undefined, returns the exception as a string. * TODO: This isn't working as expected. When a string is passed, it's returning {}. * IE: TypeError: storageUtil.getBucket(...).file is not a function */ function getExceptionString(exception) { let str = JSON.stringify(exception); // Try to parse to json using JSON.parse, and only if returns true, then return the json string, otherwise return object. if (!str) { str = exception; } return str; } if (process.env.UNIT_TESTS) { module.exports = { getExceptionString, getBucketName, processFile, labelName }; }