in ingestion/batch/index.js [90:134]
async function processFile(options, throws) {
batchId = cloudFunctionUtil.generateBatchId(options.eventId, options.bucketName, options.fileName);
console.log(`processFile called for ${getBucketName(options)}, batchId is ${batchId}`);
const config = await configManager.getConfiguration(options);
const haveDataset = await bigqueryUtil.datasetExists(config.datasetId);
if (!haveDataset) {
console.log(`Dataset ${config.datasetId} not found, creating...`);
const options = {
labels: {}
};
options.labels[labelName] = "true";
await bigqueryUtil.createDataset(config.datasetId, options);
console.log(`Created dataset ${config.datasetId}`);
} else {
console.log(`Found dataset ${config.datasetId}`);
}
let success = false;
let ex;
try {
await stageFile(config);
await transform(config);
if (archiveEnabled === true) {
await storageUtil.moveFile(options.bucketName, config.sourceFile, config.bucketPath.archive);
console.log(`File '${config.sourceFile}' has been archived to: ${config.bucketPath.archive}`);
}
success = true;
}
catch (reason) {
ex = `Exception processing ${options.fileName}: ${reason}`;
console.error(ex);
}
finally {
await bigqueryUtil.deleteTable(config.datasetId, config.stagingTable, true);
}
if (throws && !success) {
throw ex;
}
return success;
}