getTransporter()

in marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js [502:550]


  getTransporter(timeout = 60, targetTopic = `${this.namespace}-push`) {
    /** @type {!CloudFunctionNode8} */
    const transportMessage = async (message, context) => {
      const attributes = message.attributes || {};
      const messageId = context.eventId;
      if (!attributes.topic) {
        this.logger.warn(`There is no source topic: ${messageId}`);
        return TransportResult.NO_SOURCE_TOPIC;
      }
      const sourceTopic = attributes.topic;
      const lockToken = nanoid();
      const getLocked = await this.apiLockDao.getLock(sourceTopic, lockToken);
      if (!getLocked) {
        this.logger.warn(
          `There is no available lock for ${sourceTopic}. QUIT.`);
        return TransportResult.NO_LOCK;
      }
      const data = Buffer.from(message.data, 'base64').toString();
      this.logger.debug(
        `Get nudge message[${messageId}]:${data}. Transporting ${sourceTopic}`);
      const result =
        await this.passOneMessage_(sourceTopic, lockToken, timeout, targetTopic);
      this.logger.debug(
        `Nudge message[${messageId}] transport results: ${result}`);
      if (result === TransportResult.TIMEOUT) {
        await this.apiLockDao.unlock(sourceTopic, lockToken);
        this.logger.info(`There is no new message in ${sourceTopic}. QUIT`);
        return TransportResult.TIMEOUT;
      }
      if (result === TransportResult.DUPLICATED) {
        await this.apiLockDao.unlock(sourceTopic, lockToken);
        this.logger.warn(`Duplicated message[${messageId}] for ${sourceTopic}.`);
        await this.nudge(
          `Got a duplicated message[${messageId}], ahead next.`, attributes);
        return TransportResult.DUPLICATED;
      }
      if (result === TransportResult.DONE) {
        if (await this.apiLockDao.hasAvailableLock(sourceTopic)) {
          this.logger.info(
            `Continue as there is available lock ${sourceTopic}.`);
          await this.nudge(
            `Continue from message[${messageId}], more locks available.`,
            attributes);
        }
      }
      return result;
    };
    return transportMessage;
  }