getLoadFileAndNudgeFn()

in marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js [162:217]


  getLoadFileAndNudgeFn(adhocConfig = {}) {
    /**
     * Loads the GCS file and sent to Pub/Sub as messages, then sends a 'nudge'
     * message to start the sendibng process.
     * @param {!ValidatedStorageFile} file Cloud Storage file information.
     * @return {!Promise<string|undefined>} The Id of TentaclesFile.
     */
    return async (file) => {
      const attributes = adhocConfig.attributes || getAttributes(file.name);
      attributes.gcs = getApiOnGcs().includes(attributes.api).toString();
      attributes.topic = getTopicNameByApi(this.namespace, attributes.api);
      this.logger.debug(`Attributes from [${file.name}]: `, attributes);
      const fileEntity = Object.assign({ attributes }, file);
      const fileId = (await this.tentaclesFileDao.createFile(fileEntity)).toString();
      this.logger.debug(`Incoming file is logged as [${fileId}].`);
      try {
        const { api, config } = attributes;
        if (!this.options.getApiHandler(api)) {
          throw new Error(`Unknown API: ${api}.`);
        }
        const apiConfig = adhocConfig.config ||
          await this.apiConfigDao.getConfig(api, config);
        if (!apiConfig) {
          throw new Error(`API[${api}] has unknown config: ${config}.`);
        }
        if (file.size === 0) {
          this.logger.warn(`Empty file: ${file.name}.`);
        }
        /** @type {TentaclesTask} */
        const taskBaseInfo = Object.assign({ fileId }, attributes);
        let result;
        if (attributes.gcs === 'true') {
          result = await this.sendFileInfoToMessage_(file, taskBaseInfo, apiConfig);
        } else {
          result = await this.sendDataToMessage_(file, taskBaseInfo, apiConfig);
        }
        await this.tentaclesFileDao.updateFile(fileId, result);
        // The whole file be held if some of its tasks failed.
        if (result.status === TentaclesFileStatus.ERROR) {
          throw new Error(`Errors in send out data.`);
        }
        const fullFilePath = `gs://${file.bucket}/${file.name}`;
        await this.nudge(`After publish of ${fullFilePath}`,
          { topic: attributes.topic });
        await this.tentaclesFileDao.updateFile(fileId, {
          status: TentaclesFileStatus.STARTED,
          startSendingTime: new Date(),
        });
        return fileId;
      } catch (error) {
        this.logger.error(`Error in ${file.name}: `, error);
        await this.tentaclesFileDao.logError(fileId, error.message);
        throw new Error(`File ${fileId} failed: ${error.message}`);
      }
    };
  }