in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/FeatureStoreManager.scala [250:321]
private def batchIngestIntoOfflineStore(
dataFrame: DataFrame,
describeResponse: DescribeFeatureGroupResponse,
eventTimeFeatureName: String,
region: String
): Unit = {
if (!isFeatureGroupOfflineStoreEnabled(describeResponse)) {
throw ValidationError(
s"OfflineStore of FeatureGroup: '${describeResponse.featureGroupName()}' is not enabled."
)
}
val offlineStoreEncryptionKeyId =
describeResponse.offlineStoreConfig().s3StorageConfig().kmsKeyId()
val tableFormat = describeResponse.offlineStoreConfig().tableFormat()
val destinationFilePath = generateDestinationFilePath(describeResponse)
val tempDataFrame = dataFrame
.withColumn("api_invocation_time", current_timestamp())
.withColumn("write_time", current_timestamp())
.withColumn("is_deleted", lit(false))
if (isIcebergTableEnabled(describeResponse)) {
val resolvedOutputS3Uri = describeResponse.offlineStoreConfig().s3StorageConfig().resolvedOutputS3Uri()
val dataCatalogName = describeResponse.offlineStoreConfig().dataCatalogConfig().catalog().toLowerCase()
val dataBaseName = describeResponse.offlineStoreConfig().dataCatalogConfig().database().toLowerCase()
val tableName = describeResponse.offlineStoreConfig().dataCatalogConfig().tableName().toLowerCase()
SparkSessionInitializer.initializeSparkSessionForIcebergTable(
dataFrame.sparkSession,
offlineStoreEncryptionKeyId,
resolvedOutputS3Uri,
dataCatalogName,
assumeRoleArn,
region
)
tempDataFrame
.sortWithinPartitions(col(eventTimeFeatureName))
.writeTo(f"$dataCatalogName.$dataBaseName.`$tableName`")
.option("compression", "none")
.append()
} else if (isGlueTableEnabled(describeResponse) || tableFormat == null) {
SparkSessionInitializer.initializeSparkSessionForOfflineStore(
dataFrame.sparkSession,
offlineStoreEncryptionKeyId,
assumeRoleArn,
region
)
val offlineDataFrame = tempDataFrame
.withColumn("temp_event_time_col", col(eventTimeFeatureName).cast("Timestamp"))
.withColumn("year", date_format(col("temp_event_time_col"), "yyyy"))
.withColumn("month", date_format(col("temp_event_time_col"), "MM"))
.withColumn("day", date_format(col("temp_event_time_col"), "dd"))
.withColumn("hour", date_format(col("temp_event_time_col"), "HH"))
.drop("temp_event_time_col")
offlineDataFrame
.repartition(col("year"), col("month"), col("day"), col("hour"))
.write
.partitionBy("year", "month", "day", "hour")
.option("compression", "none")
.mode("append")
.parquet(destinationFilePath)
} else {
val tableFormat = describeResponse.offlineStoreConfig().tableFormat()
throw new RuntimeException(
f"Invalid table format '$tableFormat' detected and is not supported by feature store spark connector."
)
}
}