in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/validators/InputDataSchemaValidator.scala [155:179]
private def getSchemaDataTypeValidatorMap(
dataFrame: DataFrame,
featureDefinitions: List[FeatureDefinition],
eventTimeFeatureName: String
): Map[String, String => Column] = {
val lambdaCreator = (sparkType: String) => (featureName: String) => col(featureName).cast(sparkType)
val conversionsMap = featureDefinitions.foldLeft(Map.empty[String, String => Column]) {
(resultMap, featureDefinition) =>
{
val featureName = featureDefinition.featureName()
val sparkType = TYPE_MAP(featureDefinition.featureTypeAsString())
if (featureName.equals(eventTimeFeatureName)) {
resultMap + (eventTimeFeatureName -> ((featureName: String) =>
col(featureName).cast(sparkType).cast(TimestampType)
))
} else if (dataFrame.schema.names.contains(featureName)) {
resultMap + (featureName -> lambdaCreator(sparkType))
} else {
resultMap
}
}
}
conversionsMap
}