in marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js [162:217]
getLoadFileAndNudgeFn(adhocConfig = {}) {
/**
* Loads the GCS file and sent to Pub/Sub as messages, then sends a 'nudge'
* message to start the sendibng process.
* @param {!ValidatedStorageFile} file Cloud Storage file information.
* @return {!Promise<string|undefined>} The Id of TentaclesFile.
*/
return async (file) => {
const attributes = adhocConfig.attributes || getAttributes(file.name);
attributes.gcs = getApiOnGcs().includes(attributes.api).toString();
attributes.topic = getTopicNameByApi(this.namespace, attributes.api);
this.logger.debug(`Attributes from [${file.name}]: `, attributes);
const fileEntity = Object.assign({ attributes }, file);
const fileId = (await this.tentaclesFileDao.createFile(fileEntity)).toString();
this.logger.debug(`Incoming file is logged as [${fileId}].`);
try {
const { api, config } = attributes;
if (!this.options.getApiHandler(api)) {
throw new Error(`Unknown API: ${api}.`);
}
const apiConfig = adhocConfig.config ||
await this.apiConfigDao.getConfig(api, config);
if (!apiConfig) {
throw new Error(`API[${api}] has unknown config: ${config}.`);
}
if (file.size === 0) {
this.logger.warn(`Empty file: ${file.name}.`);
}
/** @type {TentaclesTask} */
const taskBaseInfo = Object.assign({ fileId }, attributes);
let result;
if (attributes.gcs === 'true') {
result = await this.sendFileInfoToMessage_(file, taskBaseInfo, apiConfig);
} else {
result = await this.sendDataToMessage_(file, taskBaseInfo, apiConfig);
}
await this.tentaclesFileDao.updateFile(fileId, result);
// The whole file be held if some of its tasks failed.
if (result.status === TentaclesFileStatus.ERROR) {
throw new Error(`Errors in send out data.`);
}
const fullFilePath = `gs://${file.bucket}/${file.name}`;
await this.nudge(`After publish of ${fullFilePath}`,
{ topic: attributes.topic });
await this.tentaclesFileDao.updateFile(fileId, {
status: TentaclesFileStatus.STARTED,
startSendingTime: new Date(),
});
return fileId;
} catch (error) {
this.logger.error(`Error in ${file.name}: `, error);
await this.tentaclesFileDao.logError(fileId, error.message);
throw new Error(`File ${fileId} failed: ${error.message}`);
}
};
}