def validateSchemaNames()

in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/validators/InputDataSchemaValidator.scala [98:153]


  def validateSchemaNames(
      schemaNames: Array[String],
      describeResponse: DescribeFeatureGroupResponse,
      recordIdentifierName: String,
      eventTimeFeatureName: String
  ): Unit = {
    val invalidCharSet              = "[,;{}()\n\t=]"
    val invalidCharSetPattern       = Pattern.compile(invalidCharSet)
    val unknown_columns             = ListBuffer[String]()
    var missingRequiredFeatureNames = Set(recordIdentifierName, eventTimeFeatureName)

    val features = describeResponse
      .featureDefinitions()
      .asScala
      .toStream
      .map(feature => feature.featureName())
      .toSet

    for (name <- schemaNames) {
      // Verify there are no invalid characters ",;{}()\n\t=" in the schema names.
      if (invalidCharSetPattern.matcher(name).matches()) {
        throw ValidationError(
          s"Cannot proceed. Invalid char among '$invalidCharSet' detected in '$name'."
        )
      }

      // Verify there is no reserved feature name.
      if (RESERVED_FEATURE_NAMES.contains(name)) {
        throw ValidationError(
          s"Cannot proceed. Detected column with reserved feature name '$name'."
        )
      }

      if (!features.contains(name)) {
        unknown_columns += name
      }

      if (missingRequiredFeatureNames.contains(name)) {
        missingRequiredFeatureNames -= name
      }
    }

    // Verify there is no unknown column.
    if (unknown_columns.nonEmpty) {
      throw ValidationError(
        s"Cannot proceed. Schema contains unknown columns: '${unknown_columns.mkString(",")}'"
      )
    }

    // Verify all required feature names are present in schema.
    if (missingRequiredFeatureNames.nonEmpty) {
      throw ValidationError(
        s"Cannot proceed. Missing feature names '${missingRequiredFeatureNames.mkString(",")}' in schema."
      )
    }
  }