public async generateData()

in src/platform/packages/private/kbn-generate-csv/src/generate_csv.ts [238:482]


  public async generateData(): Promise<TaskRunResult> {
    const logger = this.logger;

    const createSearchSource = async () => {
      try {
        const source = await this.dependencies.searchSourceStart.create(this.job.searchSource);
        return source;
      } catch (err) {
        // Saved object not found
        if (err?.output?.statusCode === 404) {
          const reportingError = new ReportingSavedObjectNotFoundError(err);
          throw createTaskRunError(reportingError, TaskErrorSource.USER);
        }
        throw err;
      }
    };

    const [settings, searchSource] = await Promise.all([
      getExportSettings(
        this.clients.uiSettings,
        this.taskInstanceFields,
        this.config,
        this.job.browserTimezone,
        logger
      ),
      createSearchSource(),
    ]);

    const { startedAt, retryAt } = this.taskInstanceFields;
    if (startedAt) {
      this.logger.debug(
        `Task started at: ${startedAt && moment(startedAt).format()}.` +
          ` Can run until: ${retryAt && moment(retryAt).format()}`
      );
    }

    const index = searchSource.getField('index');

    if (!index) {
      throw new Error(`The search must have a reference to an index pattern!`);
    }

    const { maxSizeBytes, bom, escapeFormulaValues, timezone } = settings;
    const indexPatternTitle = index.getIndexPattern();
    const builder = new MaxSizeStringBuilder(this.stream, byteSizeValueToNumber(maxSizeBytes), bom);
    const warnings: string[] = [];
    let first = true;
    let currentRecord = -1;
    let totalRecords: number | undefined;
    let reportingError: undefined | ReportingError;

    const abortController = new AbortController();
    this.cancellationToken.on(() => abortController.abort());

    // use a class to internalize the paging strategy
    let cursor: SearchCursor;
    if (this.job.pagingStrategy === 'scroll') {
      // Optional strategy: scan-and-scroll
      cursor = new SearchCursorScroll(
        indexPatternTitle,
        settings,
        this.clients,
        abortController,
        this.logger
      );
      logger.debug('Using search strategy: scroll');
    } else {
      // Default strategy: point-in-time
      cursor = new SearchCursorPit(
        indexPatternTitle,
        settings,
        this.clients,
        abortController,
        this.logger
      );
      logger.debug('Using search strategy: pit');
    }
    await cursor.initialize();

    // apply timezone from the job to all date field formatters
    try {
      index.fields.getByType('date').forEach(({ name }) => {
        logger.debug(`Setting timezone on ${name}`);
        const format: FieldFormatConfig = {
          ...index.fieldFormatMap[name],
          id: index.fieldFormatMap[name]?.id || 'date', // allow id: date_nanos
          params: {
            ...index.fieldFormatMap[name]?.params,
            timezone,
          },
        };
        index.setFieldFormat(name, format);
      });
    } catch (err) {
      logger.error(err);
    }

    const columns = new Set<string>(this.job.columns ?? []);
    try {
      do {
        if (this.cancellationToken.isCancelled()) {
          break;
        }

        searchSource.setField('size', settings.scroll.size);

        let results: estypes.SearchResponse<unknown> | undefined;
        try {
          results = await cursor.getPage(searchSource);
        } catch (err) {
          this.logger.error(`CSV export search error: ${err}`);
          throw err;
        }

        if (!results) {
          logger.warn(`Search results are undefined!`);
          break;
        }

        const { total } = results.hits;
        const trackedTotal = total as estypes.SearchTotalHits;
        const currentTotal = trackedTotal?.value ?? total;

        if (first) {
          // export stops when totalRecords have been accumulated (or the results have run out)
          totalRecords = currentTotal;
        }

        // use the most recently received cursor id for the next search request
        cursor.updateIdFromResults(results);

        // check for shard failures, log them and add a warning if found
        const { _shards: shards } = results;
        if (shards.failures) {
          shards.failures.forEach(({ reason }) => {
            warnings.push(`Shard failure: ${JSON.stringify(reason)}`);
            logger.warn(JSON.stringify(reason));
          });
        }

        let table: Datatable | undefined;
        try {
          table = tabifyDocs(results, index, { shallow: true, includeIgnoredValues: true });
        } catch (err) {
          logger.error(err);
          warnings.push(i18nTexts.unknownError(err?.message ?? err));
        }

        if (!table) {
          break;
        }

        if (!this.job.columns?.length) {
          this.getColumnsFromTabify(table).forEach((column) => columns.add(column));
        }

        if (first) {
          first = false;
          this.generateHeader(columns, builder, settings, index);
        }

        if (table.rows.length < 1) {
          break; // empty report with just the header
        }

        // FIXME: make tabifyDocs handle the formatting, to get the same formatting logic as Discover?
        const formatters = this.getFormatters(table);
        await this.generateRows(columns, table, builder, formatters, settings);

        // update iterator
        currentRecord += table.rows.length;
      } while (totalRecords != null && currentRecord < totalRecords - 1);

      // Add warnings to be logged
      if (this.csvContainsFormulas && escapeFormulaValues) {
        warnings.push(i18nTexts.escapedFormulaValuesMessage);
      }
    } catch (err) {
      logger.error(err);
      if (err instanceof esErrors.ResponseError) {
        if ([401, 403].includes(err.statusCode ?? 0)) {
          reportingError = new AuthenticationExpiredError();
          warnings.push(i18nTexts.authenticationError.partialResultsMessage);
        } else {
          warnings.push(i18nTexts.esErrorMessage(err.statusCode ?? 0, String(err.body)));
        }
      } else {
        warnings.push(i18nTexts.unknownError(err?.message ?? err));
      }
    } finally {
      try {
        await cursor.closeCursor();
      } catch (err) {
        logger.error(err);
        warnings.push(cursor.getUnableToCloseCursorMessage());
      }
    }

    logger.info(`Finished generating. Row count: ${this.csvRowCount}.`);

    if (!this.maxSizeReached && this.csvRowCount !== totalRecords) {
      logger.warn(
        `ES scroll returned ` +
          `${this.csvRowCount > (totalRecords ?? 0) ? 'more' : 'fewer'} total hits than expected!`
      );
      logger.warn(`Search result total hits: ${totalRecords}. Row count: ${this.csvRowCount}`);

      if (totalRecords || totalRecords === 0) {
        warnings.push(
          i18nTexts.csvRowCountError({ expected: totalRecords, received: this.csvRowCount })
        );
      } else {
        warnings.push(i18nTexts.csvRowCountIndeterminable({ received: this.csvRowCount }));
      }
    }

    if (this.csvRowCount === 0) {
      if (warnings.length > 0) {
        /*
         * Add the errors into the CSV content. This makes error messages more
         * discoverable. When the export was automated or triggered by an API
         * call or is automated, the user doesn't necessarily go through the
         * Kibana UI to download the export and might not otherwise see the
         * error message.
         */
        logger.info('CSV export content was empty. Adding error messages to CSV export content.');
        // join the string array and putting double quotes around each item
        // add a leading newline so the first message is not treated as a header
        builder.tryAppend('\n"' + warnings.join('"\n"') + '"');
      } else {
        logger.info('CSV export content was empty. No error messages.');
      }
    }

    return {
      content_type: CONTENT_TYPE_CSV,
      csv_contains_formulas: this.csvContainsFormulas && !escapeFormulaValues,
      max_size_reached: this.maxSizeReached,
      metrics: {
        csv: { rows: this.csvRowCount },
      },
      warnings,
      error_code: reportingError?.code,
    };
  }