in src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala [398:473]
def runOnAggregatedStates(
schema: StructType,
analysis: Analysis,
stateLoaders: Seq[StateLoader],
saveStatesWith: Option[StatePersister] = None,
storageLevelOfGroupedDataForMultiplePasses: StorageLevel = StorageLevel.MEMORY_AND_DISK,
metricsRepository: Option[MetricsRepository] = None,
saveOrAppendResultsWithKey: Option[ResultKey] = None )
: AnalyzerContext = {
if (analysis.analyzers.isEmpty || stateLoaders.isEmpty) {
return AnalyzerContext.empty
}
val analyzers = analysis.analyzers.map { _.asInstanceOf[Analyzer[State[_], Metric[_]]] }
/* Find all analyzers which violate their preconditions */
val passedAnalyzers = analyzers
.filter { analyzer =>
Preconditions.findFirstFailing(schema, analyzer.preconditions).isEmpty
}
val failedAnalyzers = analyzers.diff(passedAnalyzers)
/* Create the failure metrics from the precondition violations */
val preconditionFailures = computePreconditionFailureMetrics(failedAnalyzers, schema)
val aggregatedStates = InMemoryStateProvider()
/* Aggregate all initial states */
passedAnalyzers.foreach { analyzer =>
stateLoaders.foreach { stateLoader =>
analyzer.aggregateStateTo(aggregatedStates, stateLoader, aggregatedStates)
}
}
/* Identify analyzers which require us to group the data */
val (groupingAnalyzers, scanningAnalyzers) =
passedAnalyzers.partition { _.isInstanceOf[GroupingAnalyzer[State[_], Metric[_]]] }
val nonGroupedResults = scanningAnalyzers
.map { _.asInstanceOf[Analyzer[State[_], Metric[_]]] }
.flatMap { analyzer =>
val metrics = analyzer
.loadStateAndComputeMetric(aggregatedStates)
/* Store aggregated state if a 'saveStatesWith' has been provided */
saveStatesWith.foreach { persister => analyzer.copyStateTo(aggregatedStates, persister) }
metrics.map { metric => analyzer -> metric }
}
.toMap[Analyzer[_, Metric[_]], Metric[_]]
val groupedResults = if (groupingAnalyzers.isEmpty) {
AnalyzerContext.empty
} else {
groupingAnalyzers
.map { _.asInstanceOf[GroupingAnalyzer[State[_], Metric[_]]] }
.groupBy { _.groupingColumns().sorted }
.map { case (_, analyzersForGrouping) =>
val state = findStateForParticularGrouping(analyzersForGrouping, aggregatedStates)
runAnalyzersForParticularGrouping(state, analyzersForGrouping, saveStatesWith,
storageLevelOfGroupedDataForMultiplePasses)
}
.reduce { _ ++ _ }
}
val results = preconditionFailures ++ AnalyzerContext(nonGroupedResults) ++ groupedResults
saveOrAppendResultsIfNecessary(results, metricsRepository, saveOrAppendResultsWithKey)
results
}