async sendDataInternal()

in marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/pubsub_message_send.js [91:143]


  async sendDataInternal(pubsub, records, messageId, config) {
    this.logger.debug(`Init Pub/Sub message sender with Debug Mode.`);
    const managedSend = this.getManagedSendFn(config);
    const topic = await pubsub.getOrCreateTopic(config.topic);
    /** This function send out one message with the given data. */
    const getSendSingleMessageFn = async (lines, batchId) => {
      if (lines.length !== 1) throw Error('Wrong number of Pub/Sub messages.');
    /** @const {!BatchResult} */ const batchResult = {
        numberOfLines: 1,
      };
      const args = JSON.parse(lines[0]);
      // JSON string values need to be escaped
      Object.keys(args).forEach((key) => {
        try {
          if (typeof args[key] === 'string') {
            args[key] = args[key].replace(/\n/g, '\\n').replace(/\"/g, '\\"');
          }
        } catch (error) {
          this.logger.error(key, typeof args[key]);
          this.logger.error(error);
        }
      })
      const originalMessage = typeof (config.message) === 'object'
        ? JSON.stringify(config.message) : config.message;
      const message = replaceParameters(originalMessage || '', args, true);
      const attributes = JSON.parse(
        replaceParameters(JSON.stringify(config.attributes || {}), args, true)
      );
      let retryTimes = 0;
      let errors = [];
      do {
        // Wait sometime (1s, 2s, 3s, ...) before each retry.
        if (retryTimes > 0) await wait(retryTimes * 1000);
        try {
          const messageId = await pubsub.publish(topic, message, attributes);
          this.logger.debug(
            `Send ${lines[0]} to ${config.topic} as ${messageId}.`);
          batchResult.result = true;
          return batchResult;
        } catch (error) {
          this.logger.error(
            `Pub/Sub message[${batchId}] failed: ${lines[0]}`, error);
          errors.push(error.message);
          retryTimes++;
        }
      } while (retryTimes <= RETRY_TIMES)
      batchResult.result = false;
      batchResult.errors = errors;
      batchResult.failedLines = lines;
      return batchResult;
    };
    return managedSend(getSendSingleMessageFn, records, messageId);
  }