getApiRequester()

in marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js [617:718]


  getApiRequester() {
    /** @type {!CloudFunctionNode8} */
    const sendApiData = async (message, context) => {
      const messageId = context.eventId;
      const { attributes, data } = message;
      const records = Buffer.from(data, 'base64').toString();
      this.logger.debug(
        `Receive message[${messageId}] with ${records.length} bytes.`);
      const {
        api,
        config,
        apiConfigJson,
        dryRun,
        appended,
        taskId,
        topic,
        lockToken,
      } = attributes || {};
      /** @type {BatchResult} */ let batchResult;
      let needContinue = true;
      try {
        const canStartTask = await this.tentaclesTaskDao.start(taskId);
        if (!canStartTask) {// Wrong status task for duplicated message?
          this.logger.warn(`Quit before sending data. Duplicated message?`);
          return;
        }
        const apiHandler = this.options.getApiHandler(api);
        if (!apiHandler) throw new Error(`Unknown API: ${api}.`);
        const apiConfig = apiConfigJson ? JSON.parse(apiConfigJson) :
          await this.apiConfigDao.getConfig(api, config);
        if (!apiConfig) {
          throw new Error(`API[${api}] has unknown config: ${config}.`);
        }
        let finalConfig; // Dynamic config
        if (!appended) {
          finalConfig = apiConfig;
        } else {
          const parameters = JSON.parse(appended);
          const finalConfigString =
            replaceParameters(JSON.stringify(apiConfig), parameters, true);
          finalConfig = JSON.parse(finalConfigString);
        }
        if (dryRun === 'true') {
          this.logger.info(`[DryRun] API[${api}] and config[${config}]: `,
            finalConfig);
          batchResult = { result: true }; // A dry-run task always succeeds.
          if (!getApiOnGcs().includes(api)) {
            batchResult.numberOfLines = records.split('\n').length;
          }
        } else {
          batchResult =
            await apiHandler.sendData(records, messageId, finalConfig);
        }
        const { numberOfLines = 0, failedLines, groupedFailed } = batchResult;
        await this.tentaclesTaskDao.updateTask(taskId, {
          numberOfLines,
          numberOfFailed: failedLines ? failedLines.length : 0,
        });
        //If the record is empty, wait extra time to keep logs sequential.
        if (numberOfLines === 0) {
          await wait(1000);
        }
        if (groupedFailed) {
          const errorLogger = getLogger('TentaclesFailedRecord');
          Object.keys(groupedFailed).forEach((error) => {
            errorLogger.info(
              JSON.stringify(
                { taskId, error, records: groupedFailed[error] }));
          });
        }
        if (batchResult.result) {
          await this.tentaclesTaskDao.finish(taskId, batchResult.result);
        } else {
          await this.tentaclesTaskDao.logError(taskId, batchResult.errors);
        }
      } catch (error) {
        this.logger.error(`Error in API[${api}], config[${config}]: `, error);
        await this.tentaclesTaskDao.logError(taskId, error);
        needContinue = !error.message.startsWith('Unknown API');
      }
      if (!topic) {
        this.logger.info('There is no topic. In local file upload mode.');
        return;
      }
      if (!needContinue) {
        this.logger.info(`Skip unsupported API ${api}.`);
      }
      try {
        return this.releaseLockAndNotify(
          topic, lockToken, messageId, needContinue);
      } catch (error) {
        // Re-do this when unknown external exceptions happens.
        this.logger.error('Exception happened while try to release the lock: ',
          error);
        await wait(10000); // wait 10 sec
        this.logger.info('Wait 10 second and retry...');
        return this.releaseLockAndNotify(
          topic, lockToken, messageId, needContinue);
      }
    }
    return sendApiData;
  }