in src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala [97:207]
private[deequ] def doAnalysisRun(
data: DataFrame,
analyzers: Seq[Analyzer[_, Metric[_]]],
aggregateWith: Option[StateLoader] = None,
saveStatesWith: Option[StatePersister] = None,
storageLevelOfGroupedDataForMultiplePasses: StorageLevel = StorageLevel.MEMORY_AND_DISK,
metricsRepositoryOptions: AnalysisRunnerRepositoryOptions =
AnalysisRunnerRepositoryOptions(),
fileOutputOptions: AnalysisRunnerFileOutputOptions =
AnalysisRunnerFileOutputOptions())
: AnalyzerContext = {
if (analyzers.isEmpty) {
return AnalyzerContext.empty
}
val allAnalyzers = analyzers.map { _.asInstanceOf[Analyzer[State[_], Metric[_]]] }
val distinctAnalyzers = allAnalyzers.distinct
require(distinctAnalyzers.size == allAnalyzers.size,
s"Duplicate analyzers found: ${allAnalyzers.diff(distinctAnalyzers).distinct}")
/* We do not want to recalculate calculated metrics in the MetricsRepository */
val resultsComputedPreviously: AnalyzerContext =
(metricsRepositoryOptions.metricsRepository,
metricsRepositoryOptions.reuseExistingResultsForKey)
match {
case (Some(metricsRepository: MetricsRepository), Some(resultKey: ResultKey)) =>
metricsRepository.loadByKey(resultKey).getOrElse(AnalyzerContext.empty)
case _ => AnalyzerContext.empty
}
val analyzersAlreadyRan = resultsComputedPreviously.metricMap.keys.toSet
val analyzersToRun = allAnalyzers.filterNot(analyzersAlreadyRan.contains)
/* Throw an error if all needed metrics should have gotten calculated before but did not */
if (metricsRepositoryOptions.failIfResultsForReusingMissing && analyzersToRun.nonEmpty) {
throw new ReusingNotPossibleResultsMissingException(
"Could not find all necessary results in the MetricsRepository, the calculation of " +
s"the metrics for these analyzers would be needed: ${analyzersToRun.mkString(", ")}")
}
/* Find all analyzers which violate their preconditions */
val passedAnalyzers = analyzersToRun
.filter { analyzer =>
Preconditions.findFirstFailing(data.schema, analyzer.preconditions).isEmpty
}
val failedAnalyzers = analyzersToRun.diff(passedAnalyzers)
/* Create the failure metrics from the precondition violations */
val preconditionFailures = computePreconditionFailureMetrics(failedAnalyzers, data.schema)
/* Identify analyzers which require us to group the data */
val (groupingAnalyzers, allScanningAnalyzers) =
passedAnalyzers.partition { _.isInstanceOf[GroupingAnalyzer[State[_], Metric[_]]] }
val (kllAnalyzers, scanningAnalyzers) =
allScanningAnalyzers.partition { _.isInstanceOf[KLLSketch] }
val kllMetrics =
if (kllAnalyzers.nonEmpty) {
KLLRunner.computeKLLSketchesInExtraPass(data, kllAnalyzers, aggregateWith, saveStatesWith)
} else {
AnalyzerContext.empty
}
/* Run the analyzers which do not require grouping in a single pass over the data */
val nonGroupedMetrics =
runScanningAnalyzers(data, scanningAnalyzers, aggregateWith, saveStatesWith)
// TODO this can be further improved, we can get the number of rows from other metrics as well
// TODO we could also insert an extra Size() computation if we have to scan the data anyways
var numRowsOfData = nonGroupedMetrics.metric(Size()).collect {
case DoubleMetric(_, _, _, Success(value: Double)) => value.toLong
}
var groupedMetrics = AnalyzerContext.empty
/* Run grouping analyzers based on the columns which they need to group on */
groupingAnalyzers
.map { _.asInstanceOf[GroupingAnalyzer[State[_], Metric[_]]] }
.groupBy { a => (a.groupingColumns().sorted, getFilterCondition(a)) }
.foreach { case ((groupingColumns, filterCondition), analyzersForGrouping) =>
val (numRows, metrics) =
runGroupingAnalyzers(data, groupingColumns, filterCondition, analyzersForGrouping,
aggregateWith, saveStatesWith, storageLevelOfGroupedDataForMultiplePasses,
numRowsOfData)
groupedMetrics = groupedMetrics ++ metrics
/* if we don't know the size of the data yet, we know it after the first pass */
if (numRowsOfData.isEmpty) {
numRowsOfData = Option(numRows)
}
}
val resultingAnalyzerContext = resultsComputedPreviously ++ preconditionFailures ++
nonGroupedMetrics ++ groupedMetrics ++ kllMetrics
saveOrAppendResultsIfNecessary(
resultingAnalyzerContext,
metricsRepositoryOptions.metricsRepository,
metricsRepositoryOptions.saveOrAppendResultsWithKey)
saveJsonOutputsToFilesystemIfNecessary(fileOutputOptions, resultingAnalyzerContext)
resultingAnalyzerContext
}