in src/core.js [284:370]
async execute(destDatasetId, sources, options) {
options = options || {};
let extensions = options.extensions || Object.keys(this.extensions);
let resultsToUpdate = [], allNewResults = [];
let extResponse;
let multiRowsGatherer = options.multiRowsGatherer;
assert(destDatasetId, 'destDatasetId is missing');
// Before each run.
sources.forEach(source => {
extResponse = this.runExtensions(extensions, 'beforeRun', { source: source });
source.errors = extResponse.errors;
});
// Run one source at a time and collect metrics from all gatherers.
for (let i = 0; i < sources.length; i++) {
let source = sources[i];
let statuses = [];
// Create a dummy Result.
let newResult = this.createNewResult(source, options);
// Collect metrics from all gatherers.
let gathererNames = this.parseGathererNames(source.gatherer);
gathererNames = gathererNames.concat(this.parseGathererNames(options.gatherer));
[...new Set(gathererNames)].forEach(gathererName => {
let gathererOptions = options[gathererName];
let response = this.runGatherer(source, gathererName, gathererOptions);
if (response) {
newResult[gathererName] = response;
statuses.push(newResult[gathererName].status);
}
});
// Update overall status.
newResult.status = this.getOverallStatus(statuses);
// Collect errors from all gatherers.
newResult.errors = this.getOverallErrors(newResult);
// After each run
extResponse = this.runExtensions(extensions, 'afterRun', {
source: source,
result: newResult,
});
newResult.errors = newResult.errors.concat(extResponse.errors);
// Split array data result into multiple rows.
if (multiRowsGatherer) {
let data = newResult[multiRowsGatherer].data || [];
if (Array.isArray(data)) {
data.forEach(rowData => {
let rowResult = { ...newResult };
rowResult[multiRowsGatherer] = {
status: newResult.status,
statusText: newResult.statusText,
metadata: newResult.metadata,
data: rowData,
errors: newResult.errors,
};
resultsToUpdate.push(rowResult);
allNewResults.push(rowResult);
});
}
} else {
// Collect sources and results for batch update if applicable.
resultsToUpdate.push(newResult);
allNewResults.push(newResult);
}
// Batch update to the connector if the buffer is full.
if (this.batchUpdateBuffer &&
resultsToUpdate.length >= this.batchUpdateBuffer) {
await this.connector.appendDataList(destDatasetId, resultsToUpdate, options);
this.log(`DataGathererFramework::retrieve, batch appends ` +
`${resultsToUpdate.length} results.`);
resultsToUpdate = [];
}
}
// Update the remaining.
await this.connector.appendDataList(destDatasetId, resultsToUpdate, options);
return allNewResults;
}