in marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js [502:550]
getTransporter(timeout = 60, targetTopic = `${this.namespace}-push`) {
/** @type {!CloudFunctionNode8} */
const transportMessage = async (message, context) => {
const attributes = message.attributes || {};
const messageId = context.eventId;
if (!attributes.topic) {
this.logger.warn(`There is no source topic: ${messageId}`);
return TransportResult.NO_SOURCE_TOPIC;
}
const sourceTopic = attributes.topic;
const lockToken = nanoid();
const getLocked = await this.apiLockDao.getLock(sourceTopic, lockToken);
if (!getLocked) {
this.logger.warn(
`There is no available lock for ${sourceTopic}. QUIT.`);
return TransportResult.NO_LOCK;
}
const data = Buffer.from(message.data, 'base64').toString();
this.logger.debug(
`Get nudge message[${messageId}]:${data}. Transporting ${sourceTopic}`);
const result =
await this.passOneMessage_(sourceTopic, lockToken, timeout, targetTopic);
this.logger.debug(
`Nudge message[${messageId}] transport results: ${result}`);
if (result === TransportResult.TIMEOUT) {
await this.apiLockDao.unlock(sourceTopic, lockToken);
this.logger.info(`There is no new message in ${sourceTopic}. QUIT`);
return TransportResult.TIMEOUT;
}
if (result === TransportResult.DUPLICATED) {
await this.apiLockDao.unlock(sourceTopic, lockToken);
this.logger.warn(`Duplicated message[${messageId}] for ${sourceTopic}.`);
await this.nudge(
`Got a duplicated message[${messageId}], ahead next.`, attributes);
return TransportResult.DUPLICATED;
}
if (result === TransportResult.DONE) {
if (await this.apiLockDao.hasAvailableLock(sourceTopic)) {
this.logger.info(
`Continue as there is available lock ${sourceTopic}.`);
await this.nudge(
`Continue from message[${messageId}], more locks available.`,
attributes);
}
}
return result;
};
return transportMessage;
}