private def streamIngestIntoOnlineStore()

in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/FeatureStoreManager.scala [159:203]


  private def streamIngestIntoOnlineStore(
      featureGroupName: String,
      inputDataFrame: DataFrame,
      targetStores: List[TargetStore],
      region: String
  ): Unit = {
    val columns                = inputDataFrame.schema.names
    val repartitionedDataFrame = DataFrameRepartitioner.repartition(inputDataFrame)

    // Add extra field for reporting online ingestion failures
    val castWithExceptionSchema = StructType(
      repartitionedDataFrame.schema.fields ++ Array(StructField(ONLINE_INGESTION_ERROR_FILED_NAME, StringType, true))
    )
    val fieldIndexMap = castWithExceptionSchema.fieldNames.zipWithIndex.toMap

    // Encoder needs to be defined during transformation because the original schema is changed.
    // The dataframe has to be cached otherwise the input dataset will be re-ingested when customer perform spark
    // actions on failedStreamIngestionDataFrame.
    failedStreamIngestionDataFrame = Option(
      repartitionedDataFrame
        .mapPartitions(partition => {
          ClientFactory.initialize(region, assumeRoleArn)

          putOnlineRecordsForPartition(
            partition,
            featureGroupName,
            columns,
            targetStores,
            ClientFactory.sageMakerFeatureStoreRuntimeClientBuilder.build()
          )
        })(RowEncoder(castWithExceptionSchema))
        .filter(row => row.getAs[String](fieldIndexMap(ONLINE_INGESTION_ERROR_FILED_NAME)) != null)
        .cache()
    )

    // MapPartitions and Map are lazily evaluated by spark, so action is needed here to ensure ingestion is executed
    // For more info: https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
    val failedOnlineIngestionDataFrameSize = failedStreamIngestionDataFrame.get.count()

    if (failedOnlineIngestionDataFrameSize > 0) {
      throw StreamIngestionFailureException(
        s"Stream ingestion finished, however ${failedOnlineIngestionDataFrameSize} records failed to be ingested. Please inspect failed stream ingestion data frame for more info."
      )
    }
  }