async completeTask()

in marketing-analytics/activation/data-tasks-coordinator/src/tasks/report_task.js [82:145]


  async completeTask() {
    /** @type {StorageFileConfig} */
    const destination = this.config.destination;
    const {bucket, name} = destination;
    const report = this.getReport();
    try {
      const content = await report.getContent(this.parameters);
      if (content === '') { // Empty report
        this.logger.warn('Got empty report', this.parameters);
        return {
          parameters: this.appendParameter({ reportFile: 'EMPTY_REPORT' }),
        };
      }
      this.logger.debug('Got result from report');
      const storageFile = StorageFile.getInstance(
          bucket,
          name,
          {
            projectId: destination.projectId,
            keyFilename: destination.keyFilename,
          });
      let downloadReport;
      if (!content.pipe) {
        this.logger.debug('Start to write string to gcs');
        if (this.needCompression_(name)) {
          downloadReport = new Promise((resolve, reject) => {
            gzip(content, ((error, buffer) => {
              if (error) reject(error);
              resolve(storageFile.getFile().save(buffer));
            }));
          });
        } else {
          downloadReport = storageFile.getFile().save(content);
        }
        downloadReport = downloadReport.then(() => true);
      } else {
        this.logger.debug('Start to output stream to gcs');
        const outputStream = storageFile.getFile().createWriteStream(
          { resumable: false }
        );
        const inputStream = this.needCompression_(name)
          ? content.pipe(createGzip())
          : content;
        downloadReport = new Promise((resolve, reject) => {
          inputStream
            .on('end', () => outputStream.end())
            .on('error', (error) => outputStream.emit('error', error))
            .pipe(outputStream)
            .on('error', (error) => reject(error))
            .on('finish', () => {
              this.logger.debug('Uploaded to Cloud Storage');
              resolve(true);
            });
        }
        );
      }
      const timeoutWatcher = wait(TIMEOUT_IN_MILLISECOND, false);
      const result = await Promise.race([timeoutWatcher, downloadReport]);
      if (!result) throw new Error('Timeout');
      return {parameters: this.appendParameter({reportFile: {bucket, name,}})};
    } catch (error) {
      this.triageError_(error);
    }
  }