in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/validators/InputDataSchemaValidator.scala [98:153]
def validateSchemaNames(
schemaNames: Array[String],
describeResponse: DescribeFeatureGroupResponse,
recordIdentifierName: String,
eventTimeFeatureName: String
): Unit = {
val invalidCharSet = "[,;{}()\n\t=]"
val invalidCharSetPattern = Pattern.compile(invalidCharSet)
val unknown_columns = ListBuffer[String]()
var missingRequiredFeatureNames = Set(recordIdentifierName, eventTimeFeatureName)
val features = describeResponse
.featureDefinitions()
.asScala
.toStream
.map(feature => feature.featureName())
.toSet
for (name <- schemaNames) {
// Verify there are no invalid characters ",;{}()\n\t=" in the schema names.
if (invalidCharSetPattern.matcher(name).matches()) {
throw ValidationError(
s"Cannot proceed. Invalid char among '$invalidCharSet' detected in '$name'."
)
}
// Verify there is no reserved feature name.
if (RESERVED_FEATURE_NAMES.contains(name)) {
throw ValidationError(
s"Cannot proceed. Detected column with reserved feature name '$name'."
)
}
if (!features.contains(name)) {
unknown_columns += name
}
if (missingRequiredFeatureNames.contains(name)) {
missingRequiredFeatureNames -= name
}
}
// Verify there is no unknown column.
if (unknown_columns.nonEmpty) {
throw ValidationError(
s"Cannot proceed. Schema contains unknown columns: '${unknown_columns.mkString(",")}'"
)
}
// Verify all required feature names are present in schema.
if (missingRequiredFeatureNames.nonEmpty) {
throw ValidationError(
s"Cannot proceed. Missing feature names '${missingRequiredFeatureNames.mkString(",")}' in schema."
)
}
}