in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/validators/InputDataSchemaValidator.scala [196:222]
private def getSchemaDataTypeValidatorColumn(
dataTypeValidatorMap: Map[String, String => Column],
recordIdentifierName: String,
eventTimeFeatureName: String
): List[Column] = {
// Mark the row as not valid if:
// 1. The data cannot be casted to the type specified in feature definition
// 2. The value of data is NaN
// 3. Feature value of event time feature is null
// 4. Feature value of record identifier is null
dataTypeValidatorMap.foldLeft(List[Column]()) { case (resultList, (featureName, conversion)) =>
if (featureName == recordIdentifierName || featureName == eventTimeFeatureName) {
resultList :+ when(
conversion(featureName).isNull && col(featureName).isNotNull
|| col(featureName).isNaN
|| col(featureName).isNull,
lit(featureName + " not valid")
).otherwise(lit(null))
} else {
resultList :+ when(
conversion(featureName).isNull && col(featureName).isNotNull
|| col(featureName).isNaN,
lit(featureName + " not valid")
).otherwise(lit(null))
}
}
}