in marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js [275:334]
async sendDataToMessage_(file, taskBaseInfo, apiConfig) {
const { bucket, name: fileName, size: fileSize } = file;
const { size, topic } = taskBaseInfo;
const storageFile = this.options.getStorage(bucket, fileName);
const sizeAsFloat = parseFloat(size);
let dynamicSize;
if (sizeAsFloat === 0 || typeof size === 'undefined') {
dynamicSize =
await this.getSizeDynamically(storageFile, taskBaseInfo, apiConfig);
}
const messageMaxSize = Math.floor(1000 * 1000 *
getProperValue(dynamicSize || sizeAsFloat, MESSAGE_MAXIMUM_SIZE));
this.logger.info(`Split data size: ${messageMaxSize}`);
if (fileSize / 500 > messageMaxSize) {
this.logger.warn(`High risk to get timeout for ${fileName}`,
`file [${fileSize}], messageMaxSize[${messageMaxSize}]`);
}
const splitRanges =
await storageFile.getSplitRanges(fileSize, messageMaxSize);
this.logger.info('splitRanges', splitRanges);
const sendDataResult = {
numberOfTasks: splitRanges.length,
failedTasks: [],
};
let currentResult = true;
const batchSize = this.initTaskBatch;
for (let i = 0; i < Math.ceil(splitRanges.length / batchSize); i++) {
const startIndex = i * batchSize;
const batchRanges = splitRanges.slice(startIndex, startIndex + batchSize);
const results = await Promise.all(
batchRanges.map(async ([start, end], index) => {
const rawData = await storageFile.loadContent(start, end);
// Handle the rare case when there is a BOM at the beginning of the file.
// 'Use of a BOM is neither required nor recommended for UTF-8'.
// Source: http://www.unicode.org/versions/Unicode5.0.0/ch02.pdf
const data = (start === 0 && rawData.charCodeAt(0) === 0xFEFF)
? rawData.slice(1) : rawData;
const taskEntity = Object.assign(
{ start: start.toString(), end: end.toString() }, taskBaseInfo);
this.logger.debug(
`[${startIndex + index}] Send ${data.length} bytes to Topic[${topic}].`);
try {
const result =
await this.saveTaskAndSendData_(taskEntity, data, apiConfig);
if (!result) {
sendDataResult.failedTasks.push(
{ start, end, reason: 'Fail to update task.' });
}
return result;
} catch (error) {
sendDataResult.failedTasks.push({ start, end, reason: error.message })
}
}));
currentResult = currentResult && results.every((result) => result);
}
if (currentResult !== true) {
sendDataResult.status = TentaclesFileStatus.ERROR;
}
return sendDataResult;
}