in marketing-analytics/activation/data-tasks-coordinator/src/tasks/report_task.js [82:145]
async completeTask() {
/** @type {StorageFileConfig} */
const destination = this.config.destination;
const {bucket, name} = destination;
const report = this.getReport();
try {
const content = await report.getContent(this.parameters);
if (content === '') { // Empty report
this.logger.warn('Got empty report', this.parameters);
return {
parameters: this.appendParameter({ reportFile: 'EMPTY_REPORT' }),
};
}
this.logger.debug('Got result from report');
const storageFile = StorageFile.getInstance(
bucket,
name,
{
projectId: destination.projectId,
keyFilename: destination.keyFilename,
});
let downloadReport;
if (!content.pipe) {
this.logger.debug('Start to write string to gcs');
if (this.needCompression_(name)) {
downloadReport = new Promise((resolve, reject) => {
gzip(content, ((error, buffer) => {
if (error) reject(error);
resolve(storageFile.getFile().save(buffer));
}));
});
} else {
downloadReport = storageFile.getFile().save(content);
}
downloadReport = downloadReport.then(() => true);
} else {
this.logger.debug('Start to output stream to gcs');
const outputStream = storageFile.getFile().createWriteStream(
{ resumable: false }
);
const inputStream = this.needCompression_(name)
? content.pipe(createGzip())
: content;
downloadReport = new Promise((resolve, reject) => {
inputStream
.on('end', () => outputStream.end())
.on('error', (error) => outputStream.emit('error', error))
.pipe(outputStream)
.on('error', (error) => reject(error))
.on('finish', () => {
this.logger.debug('Uploaded to Cloud Storage');
resolve(true);
});
}
);
}
const timeoutWatcher = wait(TIMEOUT_IN_MILLISECOND, false);
const result = await Promise.race([timeoutWatcher, downloadReport]);
if (!result) throw new Error('Timeout');
return {parameters: this.appendParameter({reportFile: {bucket, name,}})};
} catch (error) {
this.triageError_(error);
}
}