in src/main/scala/com/amazon/deequ/profiles/ColumnProfiler.scala [92:210]
private[deequ] def profile(
data: DataFrame,
restrictToColumns: Option[Seq[String]] = None,
printStatusUpdates: Boolean = false,
lowCardinalityHistogramThreshold: Int =
ColumnProfiler.DEFAULT_CARDINALITY_THRESHOLD,
metricsRepository: Option[MetricsRepository] = None,
reuseExistingResultsUsingKey: Option[ResultKey] = None,
failIfResultsForReusingMissing: Boolean = false,
saveInMetricsRepositoryUsingKey: Option[ResultKey] = None,
kllProfiling: Boolean = false,
kllParameters: Option[KLLParameters] = None,
predefinedTypes: Map[String, DataTypeInstances.Value] = Map.empty)
: ColumnProfiles = {
// Ensure that all desired columns exist
restrictToColumns.foreach { restrictToColumns =>
restrictToColumns.foreach { columnName =>
require(data.schema.fieldNames.contains(columnName), s"Unable to find column $columnName")
}
}
// Find columns we want to profile
val relevantColumns = getRelevantColumns(data.schema, restrictToColumns)
// First pass
if (printStatusUpdates) {
println("### PROFILING: Computing generic column statistics in pass (1/3)...")
}
// We compute completeness, approximate number of distinct values
// and type detection for string columns in the first pass
val analyzersForGenericStats = getAnalyzersForGenericStats(
data.schema,
relevantColumns,
predefinedTypes)
var analysisRunnerFirstPass = AnalysisRunner
.onData(data)
.addAnalyzers(analyzersForGenericStats)
.addAnalyzer(Size())
analysisRunnerFirstPass = setMetricsRepositoryConfigurationIfNecessary(
analysisRunnerFirstPass,
metricsRepository,
reuseExistingResultsUsingKey,
failIfResultsForReusingMissing,
saveInMetricsRepositoryUsingKey)
val firstPassResults = analysisRunnerFirstPass.run()
val genericStatistics = extractGenericStatistics(
relevantColumns,
data.schema,
firstPassResults,
predefinedTypes)
// Second pass
if (printStatusUpdates) {
println("### PROFILING: Computing numeric column statistics in pass (2/3)...")
}
// We cast all string columns that were detected as numeric
val castedDataForSecondPass = castNumericStringColumns(relevantColumns, data,
genericStatistics)
// We compute mean, stddev, min, max for all numeric columns
val analyzersForSecondPass = getAnalyzersForSecondPass(relevantColumns,
genericStatistics, kllProfiling, kllParameters)
var analysisRunnerSecondPass = AnalysisRunner
.onData(castedDataForSecondPass)
.addAnalyzers(analyzersForSecondPass)
analysisRunnerSecondPass = setMetricsRepositoryConfigurationIfNecessary(
analysisRunnerSecondPass,
metricsRepository,
reuseExistingResultsUsingKey,
failIfResultsForReusingMissing,
saveInMetricsRepositoryUsingKey)
val secondPassResults = analysisRunnerSecondPass.run()
val numericStatistics = extractNumericStatistics(secondPassResults)
// Third pass
if (printStatusUpdates) {
println("### PROFILING: Computing histograms of low-cardinality columns in pass (3/3)...")
}
// We compute exact histograms for all low-cardinality string columns, find those here
val targetColumnsForHistograms = findTargetColumnsForHistograms(data.schema, genericStatistics,
lowCardinalityHistogramThreshold)
// Find out, if we have values for those we can reuse
val analyzerContextExistingValues = getAnalyzerContextWithHistogramResultsForReusingIfNecessary(
metricsRepository,
reuseExistingResultsUsingKey,
targetColumnsForHistograms
)
// The columns we need to calculate the histograms for
val nonExistingHistogramColumns = targetColumnsForHistograms
.filter { column => analyzerContextExistingValues.metricMap.get(Histogram(column)).isEmpty }
// Calculate and save/append results if necessary
val histograms: Map[String, Distribution] = getHistogramsForThirdPass(
data,
nonExistingHistogramColumns,
analyzerContextExistingValues,
printStatusUpdates,
failIfResultsForReusingMissing,
metricsRepository,
saveInMetricsRepositoryUsingKey)
val thirdPassResults = CategoricalColumnStatistics(histograms)
createProfiles(relevantColumns, genericStatistics, numericStatistics, thirdPassResults)
}