in src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala [493:561]
private[this] def runAnalyzersForParticularGrouping(
frequenciesAndNumRows: FrequenciesAndNumRows,
analyzers: Seq[GroupingAnalyzer[State[_], Metric[_]]],
saveStatesTo: Option[StatePersister] = None,
storageLevelOfGroupedDataForMultiplePasses: StorageLevel = StorageLevel.MEMORY_AND_DISK)
: AnalyzerContext = {
val numRows = frequenciesAndNumRows.numRows
/* Identify all shareable analyzers */
val (shareable, others) =
analyzers.partition { _.isInstanceOf[ScanShareableFrequencyBasedAnalyzer] }
/* Potentially cache the grouped data if we need to make several passes,
controllable via the storage level */
if (others.nonEmpty) {
frequenciesAndNumRows.frequencies.persist(storageLevelOfGroupedDataForMultiplePasses)
}
val shareableAnalyzers = shareable.map { _.asInstanceOf[ScanShareableFrequencyBasedAnalyzer] }
val metricsByAnalyzer = if (shareableAnalyzers.nonEmpty) {
try {
val aggregations = shareableAnalyzers.flatMap { _.aggregationFunctions(numRows) }
/* Compute offsets so that the analyzers can correctly pick their results from the row */
val offsets = shareableAnalyzers.scanLeft(0) { case (current, analyzer) =>
current + analyzer.aggregationFunctions(numRows).length
}
/* Execute aggregation on grouped data */
val results = frequenciesAndNumRows.frequencies
.agg(aggregations.head, aggregations.tail: _*)
.collect()
.head
shareableAnalyzers.zip(offsets)
.map { case (analyzer, offset) =>
analyzer -> successOrFailureMetricFrom(analyzer, results, offset)
}
} catch {
case error: Exception =>
shareableAnalyzers
.map { analyzer => analyzer -> analyzer.toFailureMetric(error) }
}
} else {
Map.empty
}
/* Execute remaining analyzers on grouped data */
val otherMetrics = try {
others
.map { _.asInstanceOf[FrequencyBasedAnalyzer] }
.map { analyzer => analyzer ->
analyzer.computeMetricFrom(Option(frequenciesAndNumRows))
}
} catch {
case error: Exception =>
others.map { analyzer => analyzer -> analyzer.toFailureMetric(error) }
}
/* Potentially store states */
saveStatesTo.foreach { _.persist(analyzers.head, frequenciesAndNumRows) }
frequenciesAndNumRows.frequencies.unpersist()
AnalyzerContext((metricsByAnalyzer ++ otherMetrics).toMap[Analyzer[_, Metric[_]], Metric[_]])
}