in src/main/scala/com/amazon/deequ/suggestions/ConstraintSuggestionRunner.scala [63:136]
private[suggestions] def run(
data: DataFrame,
constraintRules: Seq[ConstraintRule[ColumnProfile]],
restrictToColumns: Option[Seq[String]],
lowCardinalityHistogramThreshold: Int,
printStatusUpdates: Boolean,
testsetWrapper: (Option[Double], Option[Long]),
cacheInputs: Boolean,
fileOutputOptions: ConstraintSuggestionFileOutputOptions,
metricsRepositoryOptions: ConstraintSuggestionMetricsRepositoryOptions,
kllWrapper: (Option[KLLParameters], Map[String, DataTypeInstances.Value]))
: ConstraintSuggestionResult = {
// get testset related data from wrapper
val testsetRatio: Option[Double] = testsetWrapper._1
val testsetSplitRandomSeed: Option[Long] = testsetWrapper._2
val kllParameters: Option[KLLParameters] = kllWrapper._1
val predefinedTypes: Map[String, DataTypeInstances.Value] = kllWrapper._2
testsetRatio.foreach { testsetRatio =>
require(testsetRatio > 0 && testsetRatio < 1.0, "Testset ratio must be in ]0, 1[")
}
val (trainingData, testData) = splitTrainTestSets(data, testsetRatio, testsetSplitRandomSeed)
if (cacheInputs) {
trainingData.cache()
testData.foreach { _.cache() }
}
val (columnProfiles, constraintSuggestions) = ConstraintSuggestionRunner().profileAndSuggest(
trainingData,
constraintRules,
restrictToColumns,
lowCardinalityHistogramThreshold,
printStatusUpdates,
metricsRepositoryOptions,
kllParameters,
predefinedTypes
)
saveColumnProfilesJsonToFileSystemIfNecessary(
fileOutputOptions,
printStatusUpdates,
columnProfiles
)
if (cacheInputs) {
trainingData.unpersist()
}
saveConstraintSuggestionJsonToFileSystemIfNecessary(
fileOutputOptions,
printStatusUpdates,
constraintSuggestions
)
val verificationResult = evaluateConstraintsIfNecessary(
testData,
printStatusUpdates,
constraintSuggestions,
fileOutputOptions
)
val columnsWithSuggestions = constraintSuggestions
.map(suggestion => suggestion.columnName -> suggestion)
.groupBy { case (columnName, _) => columnName }
.mapValues { groupedSuggestionsWithColumnNames =>
groupedSuggestionsWithColumnNames.map { case (_, suggestion) => suggestion } }
ConstraintSuggestionResult(columnProfiles.profiles, columnProfiles.numRecords,
columnsWithSuggestions, verificationResult)
}