async sendDataToMessage_()

in marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js [275:334]


  async sendDataToMessage_(file, taskBaseInfo, apiConfig) {
    const { bucket, name: fileName, size: fileSize } = file;
    const { size, topic } = taskBaseInfo;
    const storageFile = this.options.getStorage(bucket, fileName);
    const sizeAsFloat = parseFloat(size);
    let dynamicSize;
    if (sizeAsFloat === 0 || typeof size === 'undefined') {
      dynamicSize =
        await this.getSizeDynamically(storageFile, taskBaseInfo, apiConfig);
    }
    const messageMaxSize = Math.floor(1000 * 1000 *
      getProperValue(dynamicSize || sizeAsFloat, MESSAGE_MAXIMUM_SIZE));
    this.logger.info(`Split data size: ${messageMaxSize}`);
    if (fileSize / 500 > messageMaxSize) {
      this.logger.warn(`High risk to get timeout for ${fileName}`,
        `file [${fileSize}], messageMaxSize[${messageMaxSize}]`);
    }
    const splitRanges =
      await storageFile.getSplitRanges(fileSize, messageMaxSize);
    this.logger.info('splitRanges', splitRanges);
    const sendDataResult = {
      numberOfTasks: splitRanges.length,
      failedTasks: [],
    };
    let currentResult = true;
    const batchSize = this.initTaskBatch;
    for (let i = 0; i < Math.ceil(splitRanges.length / batchSize); i++) {
      const startIndex = i * batchSize;
      const batchRanges = splitRanges.slice(startIndex, startIndex + batchSize);
      const results = await Promise.all(
        batchRanges.map(async ([start, end], index) => {
          const rawData = await storageFile.loadContent(start, end);
          // Handle the rare case when there is a BOM at the beginning of the file.
          // 'Use of a BOM is neither required nor recommended for UTF-8'.
          // Source: http://www.unicode.org/versions/Unicode5.0.0/ch02.pdf
          const data = (start === 0 && rawData.charCodeAt(0) === 0xFEFF)
            ? rawData.slice(1) : rawData;
          const taskEntity = Object.assign(
            { start: start.toString(), end: end.toString() }, taskBaseInfo);
          this.logger.debug(
            `[${startIndex + index}] Send ${data.length} bytes to Topic[${topic}].`);
          try {
            const result =
              await this.saveTaskAndSendData_(taskEntity, data, apiConfig);
            if (!result) {
              sendDataResult.failedTasks.push(
                { start, end, reason: 'Fail to update task.' });
            }
            return result;
          } catch (error) {
            sendDataResult.failedTasks.push({ start, end, reason: error.message })
          }
        }));
      currentResult = currentResult && results.every((result) => result);
    }
    if (currentResult !== true) {
      sendDataResult.status = TentaclesFileStatus.ERROR;
    }
    return sendDataResult;
  }