async execute()

in src/core.js [284:370]


  async execute(destDatasetId, sources, options) {
    options = options || {};
    let extensions = options.extensions || Object.keys(this.extensions);
    let resultsToUpdate = [], allNewResults = [];
    let extResponse;
    let multiRowsGatherer = options.multiRowsGatherer;

    assert(destDatasetId, 'destDatasetId is missing');

    // Before each run.
    sources.forEach(source => {
      extResponse = this.runExtensions(extensions, 'beforeRun', { source: source });
      source.errors = extResponse.errors;
    });

    // Run one source at a time and collect metrics from all gatherers.
    for (let i = 0; i < sources.length; i++) {
      let source = sources[i];
      let statuses = [];

      // Create a dummy Result.
      let newResult = this.createNewResult(source, options);

      // Collect metrics from all gatherers.
      let gathererNames = this.parseGathererNames(source.gatherer);
      gathererNames = gathererNames.concat(this.parseGathererNames(options.gatherer));
      [...new Set(gathererNames)].forEach(gathererName => {
        let gathererOptions = options[gathererName];
        let response = this.runGatherer(source, gathererName, gathererOptions);
        if (response) {
          newResult[gathererName] = response;
          statuses.push(newResult[gathererName].status);
        }
      });

      // Update overall status.
      newResult.status = this.getOverallStatus(statuses);

      // Collect errors from all gatherers.
      newResult.errors = this.getOverallErrors(newResult);

      // After each run
      extResponse = this.runExtensions(extensions, 'afterRun', {
        source: source,
        result: newResult,
      });
      newResult.errors = newResult.errors.concat(extResponse.errors);

      // Split array data result into multiple rows.
      if (multiRowsGatherer) {
        let data = newResult[multiRowsGatherer].data || [];
        if (Array.isArray(data)) {
          data.forEach(rowData => {
            let rowResult = { ...newResult };
            rowResult[multiRowsGatherer] = {
              status: newResult.status,
              statusText: newResult.statusText,
              metadata: newResult.metadata,
              data: rowData,
              errors: newResult.errors,
            };
            resultsToUpdate.push(rowResult);
            allNewResults.push(rowResult);
          });
        }

      } else {
        // Collect sources and results for batch update if applicable.
        resultsToUpdate.push(newResult);
        allNewResults.push(newResult);
      }

      // Batch update to the connector if the buffer is full.
      if (this.batchUpdateBuffer &&
        resultsToUpdate.length >= this.batchUpdateBuffer) {
        await this.connector.appendDataList(destDatasetId, resultsToUpdate, options);
        this.log(`DataGathererFramework::retrieve, batch appends ` +
          `${resultsToUpdate.length} results.`);
        resultsToUpdate = [];
      }
    }

    // Update the remaining.
    await this.connector.appendDataList(destDatasetId, resultsToUpdate, options);

    return allNewResults;
  }