in src/main/scala/com/amazon/deequ/profiles/ColumnProfiler.scala [625:673]
def getHistogramsForThirdPass(
data: DataFrame,
nonExistingHistogramColumns: Seq[String],
analyzerContextExistingValues: AnalyzerContext,
printStatusUpdates: Boolean,
failIfResultsForReusingMissing: Boolean,
metricsRepository: Option[MetricsRepository],
saveInMetricsRepositoryUsingKey: Option[ResultKey])
: Map[String, Distribution] = {
if (nonExistingHistogramColumns.nonEmpty) {
// Throw an error if all required metrics should have been calculated before but did not
if (failIfResultsForReusingMissing) {
throw new ReusingNotPossibleResultsMissingException(
"Could not find all necessary results in the MetricsRepository, the calculation of " +
s"the histograms for these columns would be required: " +
s"${nonExistingHistogramColumns.mkString(", ")}")
}
val columnNamesAndDistribution = computeHistograms(data, nonExistingHistogramColumns)
// Now merge these results with the results that we want to reuse and store them if specified
val analyzerAndHistogramMetrics = convertColumnNamesAndDistributionToHistogramWithMetric(
columnNamesAndDistribution)
val analyzerContext = AnalyzerContext(analyzerAndHistogramMetrics) ++
analyzerContextExistingValues
saveOrAppendResultsIfNecessary(analyzerContext, metricsRepository,
saveInMetricsRepositoryUsingKey)
// Return overall results using the more simple Distribution format
analyzerContext.metricMap
.map { case (histogram: Histogram, metric: HistogramMetric) if metric.value.isSuccess =>
histogram.column -> metric.value.get
}
} else {
// We do not need to calculate new histograms
if (printStatusUpdates) {
println("### PROFILING: Skipping pass (3/3), no new histograms need to be calculated.")
}
analyzerContextExistingValues.metricMap
.map { case (histogram: Histogram, metric: HistogramMetric) if metric.value.isSuccess =>
histogram.column -> metric.value.get
}
}
}