private def batchIngestIntoOfflineStore()

in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/FeatureStoreManager.scala [250:321]


  private def batchIngestIntoOfflineStore(
      dataFrame: DataFrame,
      describeResponse: DescribeFeatureGroupResponse,
      eventTimeFeatureName: String,
      region: String
  ): Unit = {

    if (!isFeatureGroupOfflineStoreEnabled(describeResponse)) {
      throw ValidationError(
        s"OfflineStore of FeatureGroup: '${describeResponse.featureGroupName()}' is not enabled."
      )
    }

    val offlineStoreEncryptionKeyId =
      describeResponse.offlineStoreConfig().s3StorageConfig().kmsKeyId()
    val tableFormat         = describeResponse.offlineStoreConfig().tableFormat()
    val destinationFilePath = generateDestinationFilePath(describeResponse)
    val tempDataFrame = dataFrame
      .withColumn("api_invocation_time", current_timestamp())
      .withColumn("write_time", current_timestamp())
      .withColumn("is_deleted", lit(false))

    if (isIcebergTableEnabled(describeResponse)) {
      val resolvedOutputS3Uri = describeResponse.offlineStoreConfig().s3StorageConfig().resolvedOutputS3Uri()
      val dataCatalogName     = describeResponse.offlineStoreConfig().dataCatalogConfig().catalog().toLowerCase()
      val dataBaseName        = describeResponse.offlineStoreConfig().dataCatalogConfig().database().toLowerCase()
      val tableName           = describeResponse.offlineStoreConfig().dataCatalogConfig().tableName().toLowerCase()

      SparkSessionInitializer.initializeSparkSessionForIcebergTable(
        dataFrame.sparkSession,
        offlineStoreEncryptionKeyId,
        resolvedOutputS3Uri,
        dataCatalogName,
        assumeRoleArn,
        region
      )

      tempDataFrame
        .sortWithinPartitions(col(eventTimeFeatureName))
        .writeTo(f"$dataCatalogName.$dataBaseName.`$tableName`")
        .option("compression", "none")
        .append()
    } else if (isGlueTableEnabled(describeResponse) || tableFormat == null) {
      SparkSessionInitializer.initializeSparkSessionForOfflineStore(
        dataFrame.sparkSession,
        offlineStoreEncryptionKeyId,
        assumeRoleArn,
        region
      )

      val offlineDataFrame = tempDataFrame
        .withColumn("temp_event_time_col", col(eventTimeFeatureName).cast("Timestamp"))
        .withColumn("year", date_format(col("temp_event_time_col"), "yyyy"))
        .withColumn("month", date_format(col("temp_event_time_col"), "MM"))
        .withColumn("day", date_format(col("temp_event_time_col"), "dd"))
        .withColumn("hour", date_format(col("temp_event_time_col"), "HH"))
        .drop("temp_event_time_col")

      offlineDataFrame
        .repartition(col("year"), col("month"), col("day"), col("hour"))
        .write
        .partitionBy("year", "month", "day", "hour")
        .option("compression", "none")
        .mode("append")
        .parquet(destinationFilePath)
    } else {
      val tableFormat = describeResponse.offlineStoreConfig().tableFormat()
      throw new RuntimeException(
        f"Invalid table format '$tableFormat' detected and is not supported by feature store spark connector."
      )
    }
  }