async passOneMessage_()

in marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js [563:610]


  async passOneMessage_(sourceTopic, lockToken, timeout, targetTopic) {
    const subscriptionName = `${sourceTopic}-holder`;
    /**
     * Gets the message handler function for the pull subscription.
     * @param {function(*)} resolver Function to call when promise is fulfilled.
     * @return {function(!PubsubMessage):!Promise<!TransportResult>}
     */
    const getMessageHandler = (resolver) => {
      return async (message) => {
        const { id, length, attributes } = message;
        const messageTag = `[${id}]@[${sourceTopic}]`;  // For log.
        this.logger.debug(`Received ${messageTag} with data length: ${length}`);
        const taskId = attributes.taskId;
        attributes.lockToken = lockToken;
        const transported = await this.tentaclesTaskDao.transport(taskId);
        if (transported) {
          const messageId = await this.pubsub.publish(targetTopic,
            Buffer.from(message.data, 'base64').toString(), attributes);
          this.logger.debug(
            `Forward ${messageTag} as [${messageId}]@[${targetTopic}]`);
          await this.pubsub.acknowledge(subscriptionName, message.ackId);
          await this.tentaclesTaskDao.updateTask(taskId,
            { apiMessageId: messageId, lockToken });
          resolver(TransportResult.DONE);
        } else {
          this.logger.warn(
            `Wrong status for ${messageTag} (duplicated?) Task [${taskId}].`);
          await this.pubsub.acknowledge(subscriptionName, message.ackId);
          resolver(TransportResult.DUPLICATED);
        }
      };
    };
    const subscription = await this.pubsub.getOrCreateSubscription(
      sourceTopic, subscriptionName,
      { ackDeadlineSeconds: 300, flowControl: { maxMessages: 1 } });
    this.logger.debug(`Get subscription ${subscription.name}.`);
    const subscriber = new Promise((resolver) => {
      this.logger.debug(`Add messageHandler to Subscription:`, subscription);
      subscription.once(`message`, getMessageHandler(resolver));
    });
    const result = await Promise.race([
      subscriber,
      wait(timeout * 1000, TransportResult.TIMEOUT),
    ]);
    this.logger.debug(`Remove messageHandler after ${result}.`);
    subscription.removeAllListeners('message');
    return result;
  }