in s3/src/main/scala/org/apache/pekko/stream/connectors/s3/settings.scala [499:643]
def apply(c: Config): S3Settings = {
val bufferType = c.getString("buffer") match {
case "memory" =>
MemoryBufferType
case "disk" =>
val diskBufferPath = c.getString("disk-buffer-path")
DiskBufferType(Paths.get(diskBufferPath))
case other =>
throw new IllegalArgumentException(s"Buffer type must be 'memory' or 'disk'. Got: [$other]")
}
val maybeProxy = for {
host <- Try(c.getString("proxy.host")).toOption if host.nonEmpty
} yield {
Proxy(
host,
c.getInt("proxy.port"),
Uri.httpScheme(c.getBoolean("proxy.secure")))
}
val maybeForwardProxy =
if (c.hasPath("forward-proxy") && c.hasPath("forward-proxy.host") && c.hasPath("forward-proxy.port"))
Some(ForwardProxy(c.getConfig("forward-proxy")))
else None
if (c.hasPath("path-style-access"))
log.warn(
"The deprecated 'path-style-access' property was used to specify access style. Please use 'access-style' instead.")
val deprecatedPathAccessStyleSetting = Try(c.getString("path-style-access")).toOption
val accessStyle = deprecatedPathAccessStyleSetting match {
case None | Some("") =>
c.getString("access-style") match {
case "virtual" => VirtualHostAccessStyle
case "path" => PathAccessStyle
case other =>
throw new IllegalArgumentException(s"'access-style' must be 'virtual' or 'path'. Got: [$other]")
}
case Some("true") | Some("force") => PathAccessStyle
case Some("false") => VirtualHostAccessStyle
case Some(other) =>
throw new IllegalArgumentException(
s"'path-style-access' must be 'false', 'true' or 'force'. Got: [$other]. Prefer using access-style instead.")
}
val endpointUrl = if (c.hasPath("endpoint-url")) {
Option(c.getString("endpoint-url"))
} else {
None
}.orElse(maybeProxy.map(p => s"${p.scheme}://${p.host}:${p.port}"))
if (endpointUrl.isEmpty && accessStyle == PathAccessStyle)
log.warn(
s"""It appears you are attempting to use AWS S3 with path-style access.
|Amazon had planned not to support path-style access to buckets created after September 30, 2020;
|see (https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/).
|(Update, Sept 23, 2020) AWS has delayed this change indefinitely to give customers more time to
|migrate to virtual host-style access.
|
|Enable virtual host-style access by unsetting `$ConfigPath.path-style-access`,
|and leaving `$ConfigPath.access-style` on the default `virtual`.
|
|If your S3 provider is not AWS, you need to set `$ConfigPath.endpoint-url`.""".stripMargin)
val regionProvider = {
val regionProviderPath = "aws.region.provider"
val staticRegionProvider = new AwsRegionProvider {
lazy val getRegion: Region = Region.of(c.getString("aws.region.default-region"))
}
if (c.hasPath(regionProviderPath)) {
c.getString(regionProviderPath) match {
case "static" =>
staticRegionProvider
case _ =>
new DefaultAwsRegionProviderChain()
}
} else {
new DefaultAwsRegionProviderChain()
}
}
val credentialsProvider = {
val credProviderPath = "aws.credentials.provider"
if (c.hasPath(credProviderPath)) {
c.getString(credProviderPath) match {
case "default" =>
DefaultCredentialsProvider.create()
case "static" =>
val aki = c.getString("aws.credentials.access-key-id")
val sak = c.getString("aws.credentials.secret-access-key")
val tokenPath = "aws.credentials.token"
val creds: AwsCredentials = if (c.hasPath(tokenPath)) {
AwsSessionCredentials.create(aki, sak, c.getString(tokenPath))
} else {
AwsBasicCredentials.create(aki, sak)
}
StaticCredentialsProvider.create(creds)
case "anon" =>
AnonymousCredentialsProvider.create()
case _ =>
DefaultCredentialsProvider.create()
}
} else {
DefaultCredentialsProvider.create()
}
}
val apiVersion = Try(c.getInt("list-bucket-api-version") match {
case 1 => ApiVersion.ListBucketVersion1
case 2 => ApiVersion.ListBucketVersion2
}).getOrElse(ApiVersion.ListBucketVersion2)
val validateObjectKey = c.getBoolean("validate-object-key")
val retrySettings = RetrySettings(c.getConfig("retry-settings"))
val multipartUploadConfig = c.getConfig("multipart-upload")
val multipartUploadSettings = MultipartUploadSettings(
RetrySettings(multipartUploadConfig.getConfig("retry-settings")))
val signAnonymousRequests = c.getBoolean("sign-anonymous-requests")
new S3Settings(
bufferType,
credentialsProvider,
regionProvider,
accessStyle,
endpointUrl,
apiVersion,
maybeForwardProxy,
validateObjectKey,
retrySettings,
multipartUploadSettings,
signAnonymousRequests)
}