in marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/pubsub_message_send.js [91:143]
async sendDataInternal(pubsub, records, messageId, config) {
this.logger.debug(`Init Pub/Sub message sender with Debug Mode.`);
const managedSend = this.getManagedSendFn(config);
const topic = await pubsub.getOrCreateTopic(config.topic);
/** This function send out one message with the given data. */
const getSendSingleMessageFn = async (lines, batchId) => {
if (lines.length !== 1) throw Error('Wrong number of Pub/Sub messages.');
/** @const {!BatchResult} */ const batchResult = {
numberOfLines: 1,
};
const args = JSON.parse(lines[0]);
// JSON string values need to be escaped
Object.keys(args).forEach((key) => {
try {
if (typeof args[key] === 'string') {
args[key] = args[key].replace(/\n/g, '\\n').replace(/\"/g, '\\"');
}
} catch (error) {
this.logger.error(key, typeof args[key]);
this.logger.error(error);
}
})
const originalMessage = typeof (config.message) === 'object'
? JSON.stringify(config.message) : config.message;
const message = replaceParameters(originalMessage || '', args, true);
const attributes = JSON.parse(
replaceParameters(JSON.stringify(config.attributes || {}), args, true)
);
let retryTimes = 0;
let errors = [];
do {
// Wait sometime (1s, 2s, 3s, ...) before each retry.
if (retryTimes > 0) await wait(retryTimes * 1000);
try {
const messageId = await pubsub.publish(topic, message, attributes);
this.logger.debug(
`Send ${lines[0]} to ${config.topic} as ${messageId}.`);
batchResult.result = true;
return batchResult;
} catch (error) {
this.logger.error(
`Pub/Sub message[${batchId}] failed: ${lines[0]}`, error);
errors.push(error.message);
retryTimes++;
}
} while (retryTimes <= RETRY_TIMES)
batchResult.result = false;
batchResult.errors = errors;
batchResult.failedLines = lines;
return batchResult;
};
return managedSend(getSendSingleMessageFn, records, messageId);
}