def apply()

in s3/src/main/scala/org/apache/pekko/stream/connectors/s3/settings.scala [499:643]


  def apply(c: Config): S3Settings = {

    val bufferType = c.getString("buffer") match {
      case "memory" =>
        MemoryBufferType

      case "disk" =>
        val diskBufferPath = c.getString("disk-buffer-path")
        DiskBufferType(Paths.get(diskBufferPath))

      case other =>
        throw new IllegalArgumentException(s"Buffer type must be 'memory' or 'disk'. Got: [$other]")
    }

    val maybeProxy = for {
      host <- Try(c.getString("proxy.host")).toOption if host.nonEmpty
    } yield {
      Proxy(
        host,
        c.getInt("proxy.port"),
        Uri.httpScheme(c.getBoolean("proxy.secure")))
    }

    val maybeForwardProxy =
      if (c.hasPath("forward-proxy") && c.hasPath("forward-proxy.host") && c.hasPath("forward-proxy.port"))
        Some(ForwardProxy(c.getConfig("forward-proxy")))
      else None

    if (c.hasPath("path-style-access"))
      log.warn(
        "The deprecated 'path-style-access' property was used to specify access style. Please use 'access-style' instead.")

    val deprecatedPathAccessStyleSetting = Try(c.getString("path-style-access")).toOption

    val accessStyle = deprecatedPathAccessStyleSetting match {
      case None | Some("") =>
        c.getString("access-style") match {
          case "virtual" => VirtualHostAccessStyle
          case "path"    => PathAccessStyle
          case other =>
            throw new IllegalArgumentException(s"'access-style' must be 'virtual' or 'path'. Got: [$other]")
        }
      case Some("true") | Some("force") => PathAccessStyle
      case Some("false")                => VirtualHostAccessStyle
      case Some(other) =>
        throw new IllegalArgumentException(
          s"'path-style-access' must be 'false', 'true' or 'force'. Got: [$other]. Prefer using access-style instead.")
    }

    val endpointUrl = if (c.hasPath("endpoint-url")) {
      Option(c.getString("endpoint-url"))
    } else {
      None
    }.orElse(maybeProxy.map(p => s"${p.scheme}://${p.host}:${p.port}"))

    if (endpointUrl.isEmpty && accessStyle == PathAccessStyle)
      log.warn(
        s"""It appears you are attempting to use AWS S3 with path-style access.
          |Amazon had planned not to support path-style access to buckets created after September 30, 2020;
          |see (https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/).
          |(Update, Sept 23, 2020) AWS has delayed this change indefinitely to give customers more time to
          |migrate to virtual host-style access.
          |
          |Enable virtual host-style access by unsetting `$ConfigPath.path-style-access`,
          |and leaving `$ConfigPath.access-style` on the default `virtual`.
          |
          |If your S3 provider is not AWS, you need to set `$ConfigPath.endpoint-url`.""".stripMargin)

    val regionProvider = {
      val regionProviderPath = "aws.region.provider"

      val staticRegionProvider = new AwsRegionProvider {
        lazy val getRegion: Region = Region.of(c.getString("aws.region.default-region"))
      }

      if (c.hasPath(regionProviderPath)) {
        c.getString(regionProviderPath) match {
          case "static" =>
            staticRegionProvider

          case _ =>
            new DefaultAwsRegionProviderChain()
        }
      } else {
        new DefaultAwsRegionProviderChain()
      }
    }

    val credentialsProvider = {
      val credProviderPath = "aws.credentials.provider"

      if (c.hasPath(credProviderPath)) {
        c.getString(credProviderPath) match {
          case "default" =>
            DefaultCredentialsProvider.create()

          case "static" =>
            val aki = c.getString("aws.credentials.access-key-id")
            val sak = c.getString("aws.credentials.secret-access-key")
            val tokenPath = "aws.credentials.token"
            val creds: AwsCredentials = if (c.hasPath(tokenPath)) {
              AwsSessionCredentials.create(aki, sak, c.getString(tokenPath))
            } else {
              AwsBasicCredentials.create(aki, sak)
            }
            StaticCredentialsProvider.create(creds)

          case "anon" =>
            AnonymousCredentialsProvider.create()

          case _ =>
            DefaultCredentialsProvider.create()
        }
      } else {
        DefaultCredentialsProvider.create()
      }
    }

    val apiVersion = Try(c.getInt("list-bucket-api-version") match {
      case 1 => ApiVersion.ListBucketVersion1
      case 2 => ApiVersion.ListBucketVersion2
    }).getOrElse(ApiVersion.ListBucketVersion2)
    val validateObjectKey = c.getBoolean("validate-object-key")

    val retrySettings = RetrySettings(c.getConfig("retry-settings"))

    val multipartUploadConfig = c.getConfig("multipart-upload")
    val multipartUploadSettings = MultipartUploadSettings(
      RetrySettings(multipartUploadConfig.getConfig("retry-settings")))

    val signAnonymousRequests = c.getBoolean("sign-anonymous-requests")

    new S3Settings(
      bufferType,
      credentialsProvider,
      regionProvider,
      accessStyle,
      endpointUrl,
      apiVersion,
      maybeForwardProxy,
      validateObjectKey,
      retrySettings,
      multipartUploadSettings,
      signAnonymousRequests)
  }