in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/FeatureStoreManager.scala [159:203]
private def streamIngestIntoOnlineStore(
featureGroupName: String,
inputDataFrame: DataFrame,
targetStores: List[TargetStore],
region: String
): Unit = {
val columns = inputDataFrame.schema.names
val repartitionedDataFrame = DataFrameRepartitioner.repartition(inputDataFrame)
// Add extra field for reporting online ingestion failures
val castWithExceptionSchema = StructType(
repartitionedDataFrame.schema.fields ++ Array(StructField(ONLINE_INGESTION_ERROR_FILED_NAME, StringType, true))
)
val fieldIndexMap = castWithExceptionSchema.fieldNames.zipWithIndex.toMap
// Encoder needs to be defined during transformation because the original schema is changed.
// The dataframe has to be cached otherwise the input dataset will be re-ingested when customer perform spark
// actions on failedStreamIngestionDataFrame.
failedStreamIngestionDataFrame = Option(
repartitionedDataFrame
.mapPartitions(partition => {
ClientFactory.initialize(region, assumeRoleArn)
putOnlineRecordsForPartition(
partition,
featureGroupName,
columns,
targetStores,
ClientFactory.sageMakerFeatureStoreRuntimeClientBuilder.build()
)
})(RowEncoder(castWithExceptionSchema))
.filter(row => row.getAs[String](fieldIndexMap(ONLINE_INGESTION_ERROR_FILED_NAME)) != null)
.cache()
)
// MapPartitions and Map are lazily evaluated by spark, so action is needed here to ensure ingestion is executed
// For more info: https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
val failedOnlineIngestionDataFrameSize = failedStreamIngestionDataFrame.get.count()
if (failedOnlineIngestionDataFrameSize > 0) {
throw StreamIngestionFailureException(
s"Stream ingestion finished, however ${failedOnlineIngestionDataFrameSize} records failed to be ingested. Please inspect failed stream ingestion data frame for more info."
)
}
}