ingestion/batch/configurationManager.js (220 lines of code) (raw):
/**
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
const { StorageUtil, CommonUtil } = require('cds-shared');
const storageUtil = new StorageUtil();
const commonUtil = CommonUtil;
const acceptable = ['csv', 'gz', 'txt', 'avro', 'json'];
const path = require("path");
const underscore = require("underscore");
const cfg = require('./config');
/**
* @param {} options
* @param {} validateStorage Indicates if existence checks should be performed against files in Cloud Storage.
*/
async function validateOptions(options, validateStorage) {
let info = [];
let warn = [];
let errors = [];
if (!options.eventId) {
errors.push("options.eventId must be provided");
}
if (!options.bucketName) {
errors.push("options.bucketName must be provided");
}
let attributes;
if (!options.fileName) {
errors.push("options.fileName must be provided");
}
else {
attributes = parseDerivedFileAttributes(options);
console.log(`File attributes: ${JSON.stringify(attributes)}`);
// If file is archived, skip checks.
if (attributes && attributes.isDataFile) {
// options.fileName is defined
const pathParts = path.dirname(options.fileName).split("/").filter(Boolean);
console.log(`Path parts: ${pathParts}`);
if (pathParts.length !== 4) {
errors.push(`Path must contain 4 components for data files. Provided: '${pathParts}'. Path must start with '${cfg.pathPrefix}' and the data file must be in a directory named 'data'.`);
}
else {
// Path parts should contain n. IE: /${cfg.pathPrefix}/dataset/table/data, /${cfg.pathPrefix}/dataset/table/config
const first = underscore.first(pathParts);
const last = underscore.last(pathParts);
if (first !== cfg.pathPrefix) {
errors.push(`First level directory must be named '${cfg.pathPrefix}', current is '${first}'`);
}
if (last !== "data") {
errors.push(`Last level directory must be named 'data', current is '${last}'`);
}
}
if (!pathCheck(pathParts, 0, cfg.pathPrefix)) {
errors.push(`The first path component must be '${cfg.pathPrefix}' only`);
}
if (!pathCheck(pathParts, 3, "data")) {
errors.push("The fourth path component must be 'data' only");
}
if (!pathCheck(pathParts, 3, "config", false)) {
errors.push("The fourth path component must be 'config' only");
}
if (!pathCheck(pathParts, 4, "archive", false)) {
errors.push("The fifth path component must be 'archive' only");
}
const extensionSupported = commonUtil.isExtensionSupported(options.fileName, acceptable);
if (!extensionSupported) {
errors.push(`File extension '${path.extname(options.fileName)}' in fileName '${options.fileName}' is not supported`);
}
if (validateStorage) {
if (options.bucketName && extensionSupported) {
// Check for existence of a schema.json transform.sql file. If they don't exist, return warnings
const schemaConfig = attributes.schemaPath;
const transformConfig = attributes.transformPath;
const schemaConfigExists = await storageUtil.fileExists(options.bucketName, attributes.schemaPath);
const transformConfigExists = await storageUtil.fileExists(options.bucketName, attributes.transformPath);
if (schemaConfigExists) {
info.push(`Schema configuration found at '${schemaConfig}' in bucket: ${options.bucketName}`);
}
else {
warn.push(`Schema configuration not found at '${schemaConfig}' in bucket: ${options.bucketName}`);
}
if (transformConfigExists) {
info.push(`Transform configuration found at '${transformConfig}' in bucket: ${options.bucketName}`);
}
else {
warn.push(`Transform configuration not found at '${transformConfig}' in bucket: ${options.bucketName}`);
}
}
if (options.bucketName) {
const exists = await storageUtil.fileExists(options.bucketName, options.fileName);
if (!exists) {
errors.push(`File '${options.fileName}' not found in bucket: ${options.bucketName}`);
}
}
}
if (attributes.datasetId) {
if (attributes.datasetId.length > 1024) {
errors.push(`DatasetId '${attributes.datasetId}' exceeds maximum allowable length of 1024: ${attributes.datasetId.length}}`);
}
if (!attributes.datasetId.match(/^[A-Za-z0-9_]+$/g)) {
errors.push(`DatasetId '${attributes.datasetId}' name is invalid. See https://cloud.google.com/bigquery/docs/datasets for further information.`);
}
}
if (attributes.destinationTableId) {
if (attributes.destinationTableId.length > 1024) {
errors.push(`Destination tableId '${attributes.destinationTableId}' exceeds maximum allowable length of 1024: ${attributes.destinationTableId.length}}`);
}
if (!attributes.destinationTableId.match(/^[A-Za-z0-9_]+$/g)) {
errors.push(`Destination tableId '${attributes.destinationTableId}' name is invalid. See https://cloud.google.com/bigquery/docs/tables for further information.`);
}
}
}
}
if (attributes && attributes.isArchiveFile === true) {
console.log(`Ignoring archived file: '${options.fileName} in bucket: ${options.bucketName}'`);
return { isValid: false, hasException: false, isArchiveFile: true };
}
else if (attributes && attributes.isDirectoryPath === true) {
console.log(`Ignoring directory path: '${options.fileName} in bucket: ${options.bucketName}'`);
return { isValid: false, isDirectoryPath: true, hasException: false };
}
else if (attributes && attributes.isConfigFile === true) {
console.log(`Ignoring config file: '${options.fileName} in bucket: ${options.bucketName}'`);
return { isValid: false, isConfigFile: true, hasException: false };
}
else if (attributes && attributes.isDataFile === true && errors.length === 0) {
console.log(`Options validation succeeded: ${info.join(", ")}`);
return { isValid: true, isDataFile: true, info: info, warn: warn, hasException: false };
}
else {
console.log(`Options validation failed: ${errors.join(", ")}. If this is a data file, ensure that you place it within the proper path. IE: '/datashare/[dataset]/[table]/data/file.csv'`);
return { isValid: false, errors: errors, info: info, warn: warn, hasException: true };
}
}
/**
* @param {} options
*/
function parseDerivedFileAttributes(options) {
const basename = path.basename(options.fileName);
const pathParts = path.dirname(options.fileName).split("/").filter(Boolean);
const datasetId = pathParts[1];
const destinationTableId = pathParts[2];
const bucketPath = path.dirname(options.fileName);
const schemaFileBucketPath = path.join(bucketPath, "..", "config", `schema.json`);
const transformFileBucketPath = path.join(bucketPath, "..", "config", `transform.sql`);
const archivePath = path.join(bucketPath, "archive", `${basename}`);
const isDataFile = (pathParts.length === 4 && pathParts[0].toLowerCase() === cfg.pathPrefix && pathParts[3].toLowerCase() === "data");
const isConfigFile = (pathParts.length === 4 && pathParts[0].toLowerCase() === cfg.pathPrefix && pathParts[3].toLowerCase() === "config");
const isArchived = (pathParts.length === 5 && pathParts[0].toLowerCase() === cfg.pathPrefix && pathParts[3].toLowerCase() === "data" && pathParts[4].toLowerCase() === "archive");
let isDirectoryPath = false;
if (options.fileName.endsWith("/")) {
isDirectoryPath = true;
}
return {
datasetId: datasetId,
destinationTableId: destinationTableId,
schemaPath: schemaFileBucketPath,
transformPath: transformFileBucketPath,
archivePath: archivePath,
isDataFile: isDataFile,
isArchiveFile: isArchived,
isDirectoryPath: isDirectoryPath,
isConfigFile: isConfigFile
};
}
/**
* @param {} options
*/
async function getConfiguration(options) {
const attributes = parseDerivedFileAttributes(options);
let config = {};
const schemaExists = await storageUtil.fileExists(options.bucketName, attributes.schemaPath);
if (schemaExists === true) {
const schemaConfig = await storageUtil.fetchFileContent(options.bucketName, attributes.schemaPath);
if (schemaConfig) {
// This will pull in the dictionary from the configuration file. IE: includes destination, metadata, truncate, etc.
config = JSON.parse(schemaConfig);
// Updates the configured metadata to create necessary default values.
config.metadata = setMetadataDefaults(config);
}
}
// Runtime created properties
config.datasetId = attributes.datasetId;
config.destinationTableId = attributes.destinationTableId;
config.stagingTable = `TMP_${attributes.destinationTableId}_${options.eventId}`;
config.sourceFile = options.fileName;
config.bucket = options.bucketName;
config.eventId = options.eventId;
config.bucketPath = {
schema: attributes.schemaPath,
transform: attributes.transformPath,
archive: attributes.archivePath
};
console.log(`Configuration: ${JSON.stringify(config)}`);
return config;
}
/**
* @param {} dict
* Sets the default metadata values if meta is provided.
*/
function setMetadataDefaults(dict) {
let meta = dict.metadata;
if (!meta) {
console.log("No metadata found");
meta = {};
}
if (!meta.sourceFormat) {
meta.sourceFormat = 'CSV';
}
if (meta.sourceFormat.toLowerCase() === 'csv' && !meta.skipLeadingRows) {
meta.skipLeadingRows = 1;
}
if (!meta.maxBadRecords) {
meta.maxBadRecords = 0;
}
if (process.env.VERBOSE_MODE) {
console.log(`Using metadata: ${JSON.stringify(meta)}`);
}
return meta;
}
/**
* @param {} pathParts
* @param {} expectedIndex
* @param {} component
* @param {} isRequired
*/
function pathCheck(pathParts, expectedIndex, component, isRequired) {
const lPathParts = pathParts.map(c => c.toLowerCase());
if (lPathParts.length > expectedIndex && lPathParts.includes(component)) {
if (lPathParts[expectedIndex] !== component) {
return false;
}
if (underscore.filter(lPathParts, (comp) => { return comp === component; }).length > 1) {
return false;
}
return true;
}
else if (isRequired) {
return false;
}
else {
return true;
}
}
module.exports = {
validateOptions,
getConfiguration
};
if (process.env.UNIT_TESTS) {
module.exports.setMetadataDefaults = setMetadataDefaults;
}