in marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js [617:718]
getApiRequester() {
/** @type {!CloudFunctionNode8} */
const sendApiData = async (message, context) => {
const messageId = context.eventId;
const { attributes, data } = message;
const records = Buffer.from(data, 'base64').toString();
this.logger.debug(
`Receive message[${messageId}] with ${records.length} bytes.`);
const {
api,
config,
apiConfigJson,
dryRun,
appended,
taskId,
topic,
lockToken,
} = attributes || {};
/** @type {BatchResult} */ let batchResult;
let needContinue = true;
try {
const canStartTask = await this.tentaclesTaskDao.start(taskId);
if (!canStartTask) {// Wrong status task for duplicated message?
this.logger.warn(`Quit before sending data. Duplicated message?`);
return;
}
const apiHandler = this.options.getApiHandler(api);
if (!apiHandler) throw new Error(`Unknown API: ${api}.`);
const apiConfig = apiConfigJson ? JSON.parse(apiConfigJson) :
await this.apiConfigDao.getConfig(api, config);
if (!apiConfig) {
throw new Error(`API[${api}] has unknown config: ${config}.`);
}
let finalConfig; // Dynamic config
if (!appended) {
finalConfig = apiConfig;
} else {
const parameters = JSON.parse(appended);
const finalConfigString =
replaceParameters(JSON.stringify(apiConfig), parameters, true);
finalConfig = JSON.parse(finalConfigString);
}
if (dryRun === 'true') {
this.logger.info(`[DryRun] API[${api}] and config[${config}]: `,
finalConfig);
batchResult = { result: true }; // A dry-run task always succeeds.
if (!getApiOnGcs().includes(api)) {
batchResult.numberOfLines = records.split('\n').length;
}
} else {
batchResult =
await apiHandler.sendData(records, messageId, finalConfig);
}
const { numberOfLines = 0, failedLines, groupedFailed } = batchResult;
await this.tentaclesTaskDao.updateTask(taskId, {
numberOfLines,
numberOfFailed: failedLines ? failedLines.length : 0,
});
//If the record is empty, wait extra time to keep logs sequential.
if (numberOfLines === 0) {
await wait(1000);
}
if (groupedFailed) {
const errorLogger = getLogger('TentaclesFailedRecord');
Object.keys(groupedFailed).forEach((error) => {
errorLogger.info(
JSON.stringify(
{ taskId, error, records: groupedFailed[error] }));
});
}
if (batchResult.result) {
await this.tentaclesTaskDao.finish(taskId, batchResult.result);
} else {
await this.tentaclesTaskDao.logError(taskId, batchResult.errors);
}
} catch (error) {
this.logger.error(`Error in API[${api}], config[${config}]: `, error);
await this.tentaclesTaskDao.logError(taskId, error);
needContinue = !error.message.startsWith('Unknown API');
}
if (!topic) {
this.logger.info('There is no topic. In local file upload mode.');
return;
}
if (!needContinue) {
this.logger.info(`Skip unsupported API ${api}.`);
}
try {
return this.releaseLockAndNotify(
topic, lockToken, messageId, needContinue);
} catch (error) {
// Re-do this when unknown external exceptions happens.
this.logger.error('Exception happened while try to release the lock: ',
error);
await wait(10000); // wait 10 sec
this.logger.info('Wait 10 second and retry...');
return this.releaseLockAndNotify(
topic, lockToken, messageId, needContinue);
}
}
return sendApiData;
}