in marketing-analytics/activation/data-tasks-coordinator/src/tasks/multiple_task.js [276:330]
async startThroughHttp_(tag, records) {
const tasks = [];
const tentaclesFile = {};
// Generate a new file if there are more than one record for one task.
if (this.recordSize > 1) {
for (let i = 0; i < records.length; i += this.recordSize) {
const chunk = records.slice(i, i + this.recordSize);
tasks.push(JSON.stringify({ records: chunk.join('\n') }));
}
const sourceFile = this.config.source.file;
tentaclesFile.file = {
bucket: sourceFile.bucket,
name: sourceFile.name + '_for_multiple_task',
};
const outputFile = StorageFile.getInstance(
tentaclesFile.file.bucket, tentaclesFile.file.name, {
projectId: sourceFile.projectId,
keyFilename: sourceFile.keyFilename,
});
await outputFile.getFile().save(tasks.join('\n'));
} else {
tasks.push(...records);
tentaclesFile.file = this.config.source.file;
}
// To keep the TaskConfig simple, fill default fields here.
const tentaclesFileOption = lodash.merge({
service: {
projectId: '${projectId}',
locationId: '${locationId}',
namespace: '${namespace}',
},
attributes: { api: 'PB' },
config: {
topic: this.taskManager.getMonitorTopicName(),
attributes: {
taskId: this.config.destination.taskId,
[FIELD_NAMES.MULTIPLE_TAG]: tag,
}
}
}, this.config.destination.http);
const finalOption = JSON.parse(replaceParameters(
JSON.stringify(tentaclesFileOption), this.parameters, true));
const options = await getRequestOption(finalOption);
const file = getFileUrl(tentaclesFile);
const { attributes, config } = finalOption;
options.data = {
file,
attributes,
config,
};
const result = await requestWithRetry(options, this.logger);
const { fileId } = result;
this.parameters.tentaclesFileId = fileId;
return tasks.length;
}