in src/platform/packages/private/kbn-generate-csv/src/generate_csv.ts [238:482]
public async generateData(): Promise<TaskRunResult> {
const logger = this.logger;
const createSearchSource = async () => {
try {
const source = await this.dependencies.searchSourceStart.create(this.job.searchSource);
return source;
} catch (err) {
// Saved object not found
if (err?.output?.statusCode === 404) {
const reportingError = new ReportingSavedObjectNotFoundError(err);
throw createTaskRunError(reportingError, TaskErrorSource.USER);
}
throw err;
}
};
const [settings, searchSource] = await Promise.all([
getExportSettings(
this.clients.uiSettings,
this.taskInstanceFields,
this.config,
this.job.browserTimezone,
logger
),
createSearchSource(),
]);
const { startedAt, retryAt } = this.taskInstanceFields;
if (startedAt) {
this.logger.debug(
`Task started at: ${startedAt && moment(startedAt).format()}.` +
` Can run until: ${retryAt && moment(retryAt).format()}`
);
}
const index = searchSource.getField('index');
if (!index) {
throw new Error(`The search must have a reference to an index pattern!`);
}
const { maxSizeBytes, bom, escapeFormulaValues, timezone } = settings;
const indexPatternTitle = index.getIndexPattern();
const builder = new MaxSizeStringBuilder(this.stream, byteSizeValueToNumber(maxSizeBytes), bom);
const warnings: string[] = [];
let first = true;
let currentRecord = -1;
let totalRecords: number | undefined;
let reportingError: undefined | ReportingError;
const abortController = new AbortController();
this.cancellationToken.on(() => abortController.abort());
// use a class to internalize the paging strategy
let cursor: SearchCursor;
if (this.job.pagingStrategy === 'scroll') {
// Optional strategy: scan-and-scroll
cursor = new SearchCursorScroll(
indexPatternTitle,
settings,
this.clients,
abortController,
this.logger
);
logger.debug('Using search strategy: scroll');
} else {
// Default strategy: point-in-time
cursor = new SearchCursorPit(
indexPatternTitle,
settings,
this.clients,
abortController,
this.logger
);
logger.debug('Using search strategy: pit');
}
await cursor.initialize();
// apply timezone from the job to all date field formatters
try {
index.fields.getByType('date').forEach(({ name }) => {
logger.debug(`Setting timezone on ${name}`);
const format: FieldFormatConfig = {
...index.fieldFormatMap[name],
id: index.fieldFormatMap[name]?.id || 'date', // allow id: date_nanos
params: {
...index.fieldFormatMap[name]?.params,
timezone,
},
};
index.setFieldFormat(name, format);
});
} catch (err) {
logger.error(err);
}
const columns = new Set<string>(this.job.columns ?? []);
try {
do {
if (this.cancellationToken.isCancelled()) {
break;
}
searchSource.setField('size', settings.scroll.size);
let results: estypes.SearchResponse<unknown> | undefined;
try {
results = await cursor.getPage(searchSource);
} catch (err) {
this.logger.error(`CSV export search error: ${err}`);
throw err;
}
if (!results) {
logger.warn(`Search results are undefined!`);
break;
}
const { total } = results.hits;
const trackedTotal = total as estypes.SearchTotalHits;
const currentTotal = trackedTotal?.value ?? total;
if (first) {
// export stops when totalRecords have been accumulated (or the results have run out)
totalRecords = currentTotal;
}
// use the most recently received cursor id for the next search request
cursor.updateIdFromResults(results);
// check for shard failures, log them and add a warning if found
const { _shards: shards } = results;
if (shards.failures) {
shards.failures.forEach(({ reason }) => {
warnings.push(`Shard failure: ${JSON.stringify(reason)}`);
logger.warn(JSON.stringify(reason));
});
}
let table: Datatable | undefined;
try {
table = tabifyDocs(results, index, { shallow: true, includeIgnoredValues: true });
} catch (err) {
logger.error(err);
warnings.push(i18nTexts.unknownError(err?.message ?? err));
}
if (!table) {
break;
}
if (!this.job.columns?.length) {
this.getColumnsFromTabify(table).forEach((column) => columns.add(column));
}
if (first) {
first = false;
this.generateHeader(columns, builder, settings, index);
}
if (table.rows.length < 1) {
break; // empty report with just the header
}
// FIXME: make tabifyDocs handle the formatting, to get the same formatting logic as Discover?
const formatters = this.getFormatters(table);
await this.generateRows(columns, table, builder, formatters, settings);
// update iterator
currentRecord += table.rows.length;
} while (totalRecords != null && currentRecord < totalRecords - 1);
// Add warnings to be logged
if (this.csvContainsFormulas && escapeFormulaValues) {
warnings.push(i18nTexts.escapedFormulaValuesMessage);
}
} catch (err) {
logger.error(err);
if (err instanceof esErrors.ResponseError) {
if ([401, 403].includes(err.statusCode ?? 0)) {
reportingError = new AuthenticationExpiredError();
warnings.push(i18nTexts.authenticationError.partialResultsMessage);
} else {
warnings.push(i18nTexts.esErrorMessage(err.statusCode ?? 0, String(err.body)));
}
} else {
warnings.push(i18nTexts.unknownError(err?.message ?? err));
}
} finally {
try {
await cursor.closeCursor();
} catch (err) {
logger.error(err);
warnings.push(cursor.getUnableToCloseCursorMessage());
}
}
logger.info(`Finished generating. Row count: ${this.csvRowCount}.`);
if (!this.maxSizeReached && this.csvRowCount !== totalRecords) {
logger.warn(
`ES scroll returned ` +
`${this.csvRowCount > (totalRecords ?? 0) ? 'more' : 'fewer'} total hits than expected!`
);
logger.warn(`Search result total hits: ${totalRecords}. Row count: ${this.csvRowCount}`);
if (totalRecords || totalRecords === 0) {
warnings.push(
i18nTexts.csvRowCountError({ expected: totalRecords, received: this.csvRowCount })
);
} else {
warnings.push(i18nTexts.csvRowCountIndeterminable({ received: this.csvRowCount }));
}
}
if (this.csvRowCount === 0) {
if (warnings.length > 0) {
/*
* Add the errors into the CSV content. This makes error messages more
* discoverable. When the export was automated or triggered by an API
* call or is automated, the user doesn't necessarily go through the
* Kibana UI to download the export and might not otherwise see the
* error message.
*/
logger.info('CSV export content was empty. Adding error messages to CSV export content.');
// join the string array and putting double quotes around each item
// add a leading newline so the first message is not treated as a header
builder.tryAppend('\n"' + warnings.join('"\n"') + '"');
} else {
logger.info('CSV export content was empty. No error messages.');
}
}
return {
content_type: CONTENT_TYPE_CSV,
csv_contains_formulas: this.csvContainsFormulas && !escapeFormulaValues,
max_size_reached: this.maxSizeReached,
metrics: {
csv: { rows: this.csvRowCount },
},
warnings,
error_code: reportingError?.code,
};
}