in scala-spark-sdk/src/main/scala/software/amazon/sagemaker/featurestore/sparksdk/helpers/SparkSessionInitializer.scala [30:83]
def initializeSparkSessionForOfflineStore(
sparkSession: SparkSession,
offlineStoreEncryptionKmsKeyId: String,
assumeRoleArn: String,
region: String
): Unit = {
// Initialize hadoop configurations based on if assume role arn is provided
// 1. If role arn is not provided, use default credential provider
// For more info: https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
// 2. If role arn is provided, use assumed role arn provider instead
// For more info: https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/assumed_roles.html
val local_credentials_provider = List(
"com.amazonaws.auth.ContainerCredentialsProvider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
).mkString(",")
if (assumeRoleArn == null) {
sparkSession.sparkContext.hadoopConfiguration
.set("fs.s3a.aws.credentials.provider", local_credentials_provider)
} else {
val credentials_provider = "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider"
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", credentials_provider)
sparkSession.sparkContext.hadoopConfiguration
.set("fs.s3a.assumed.role.credentials.provider", local_credentials_provider)
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.assumed.role.arn", assumeRoleArn)
}
sparkSession.sparkContext.hadoopConfiguration
.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
sparkSession.sparkContext.hadoopConfiguration
.set("parquet.summary.metadata.level", "NONE")
// feature store uses SSE-KMS to encrypt data file written to S3, if no kms key id is
// specified, by default an AWS managed CMK of s3 will be used.
// For more info: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingKMSEncryption.html
sparkSession.sparkContext.hadoopConfiguration
.set("fs.s3a.server-side-encryption-algorithm", "SSE-KMS")
if (offlineStoreEncryptionKmsKeyId != null) {
sparkSession.sparkContext.hadoopConfiguration.set(
"fs.s3a.server-side-encryption.key",
offlineStoreEncryptionKmsKeyId
)
}
if (region.startsWith("cn-")) {
sparkSession.sparkContext.hadoopConfiguration
.set("fs.s3a.endpoint", s"s3.$region.amazonaws.com.cn")
} else {
sparkSession.sparkContext.hadoopConfiguration
.set("fs.s3a.endpoint", s"s3.$region.amazonaws.com")
}
}