in marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js [563:610]
async passOneMessage_(sourceTopic, lockToken, timeout, targetTopic) {
const subscriptionName = `${sourceTopic}-holder`;
/**
* Gets the message handler function for the pull subscription.
* @param {function(*)} resolver Function to call when promise is fulfilled.
* @return {function(!PubsubMessage):!Promise<!TransportResult>}
*/
const getMessageHandler = (resolver) => {
return async (message) => {
const { id, length, attributes } = message;
const messageTag = `[${id}]@[${sourceTopic}]`; // For log.
this.logger.debug(`Received ${messageTag} with data length: ${length}`);
const taskId = attributes.taskId;
attributes.lockToken = lockToken;
const transported = await this.tentaclesTaskDao.transport(taskId);
if (transported) {
const messageId = await this.pubsub.publish(targetTopic,
Buffer.from(message.data, 'base64').toString(), attributes);
this.logger.debug(
`Forward ${messageTag} as [${messageId}]@[${targetTopic}]`);
await this.pubsub.acknowledge(subscriptionName, message.ackId);
await this.tentaclesTaskDao.updateTask(taskId,
{ apiMessageId: messageId, lockToken });
resolver(TransportResult.DONE);
} else {
this.logger.warn(
`Wrong status for ${messageTag} (duplicated?) Task [${taskId}].`);
await this.pubsub.acknowledge(subscriptionName, message.ackId);
resolver(TransportResult.DUPLICATED);
}
};
};
const subscription = await this.pubsub.getOrCreateSubscription(
sourceTopic, subscriptionName,
{ ackDeadlineSeconds: 300, flowControl: { maxMessages: 1 } });
this.logger.debug(`Get subscription ${subscription.name}.`);
const subscriber = new Promise((resolver) => {
this.logger.debug(`Add messageHandler to Subscription:`, subscription);
subscription.once(`message`, getMessageHandler(resolver));
});
const result = await Promise.race([
subscriber,
wait(timeout * 1000, TransportResult.TIMEOUT),
]);
this.logger.debug(`Remove messageHandler after ${result}.`);
subscription.removeAllListeners('message');
return result;
}