def makeBucket()

in s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala [749:825]


  def makeBucket(bucket: String, headers: S3Headers)(implicit mat: Materializer, attr: Attributes): Future[Done] =
    makeBucketSource(bucket, headers).withAttributes(attr).runWith(Sink.ignore)

  def deleteBucketSource(bucket: String, headers: S3Headers): Source[Done, NotUsed] =
    s3ManagementRequest[Done](
      bucket = bucket,
      method = HttpMethods.DELETE,
      httpRequest = bucketManagementRequest(bucket),
      headers.headersFor(DeleteBucket),
      process = processS3LifecycleResponse)

  def deleteBucket(bucket: String, headers: S3Headers)(implicit mat: Materializer, attr: Attributes): Future[Done] =
    deleteBucketSource(bucket, headers).withAttributes(attr).runWith(Sink.ignore)

  def checkIfBucketExistsSource(bucketName: String, headers: S3Headers): Source[BucketAccess, NotUsed] =
    s3ManagementRequest[BucketAccess](
      bucket = bucketName,
      method = HttpMethods.HEAD,
      httpRequest = bucketManagementRequest(bucketName),
      headers.headersFor(CheckBucket),
      process = processCheckIfExistsResponse)

  def checkIfBucketExists(bucket: String, headers: S3Headers)(implicit mat: Materializer,
      attr: Attributes): Future[BucketAccess] =
    checkIfBucketExistsSource(bucket, headers).withAttributes(attr).runWith(Sink.head)

  private def uploadManagementRequest(bucket: String, key: String, uploadId: String)(method: HttpMethod,
      conf: S3Settings): HttpRequest =
    HttpRequests.uploadManagementRequest(S3Location(bucket, key), uploadId, method)(conf)

  def deleteUploadSource(bucket: String, key: String, uploadId: String, headers: S3Headers): Source[Done, NotUsed] =
    s3ManagementRequest[Done](
      bucket = bucket,
      method = HttpMethods.DELETE,
      httpRequest = uploadManagementRequest(bucket, key, uploadId),
      headers.headersFor(DeleteBucket),
      process = processS3LifecycleResponse)

  def deleteUpload(bucket: String, key: String, uploadId: String, headers: S3Headers)(implicit mat: Materializer,
      attr: Attributes): Future[Done] =
    deleteUploadSource(bucket, key, uploadId, headers).withAttributes(attr).runWith(Sink.ignore)

  private def bucketVersioningRequest(bucket: String, mfaStatus: Option[MFAStatus], headers: S3Headers)(
      method: HttpMethod,
      conf: S3Settings): HttpRequest =
    HttpRequests.bucketVersioningRequest(bucket, mfaStatus, method, headers.headers)(conf)

  def putBucketVersioningSource(
      bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers): Source[Done, NotUsed] =
    s3ManagementRequest[Done](
      bucket = bucket,
      method = HttpMethods.PUT,
      httpRequest = bucketVersioningRequest(bucket, bucketVersioning.mfaDelete, headers),
      headers.headersFor(PutBucketVersioning),
      process = processS3LifecycleResponse,
      httpEntity = Some(putBucketVersioningPayload(bucketVersioning)(ExecutionContexts.parasitic)))

  def putBucketVersioning(bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers)(
      implicit mat: Materializer,
      attr: Attributes): Future[Done] =
    putBucketVersioningSource(bucket, bucketVersioning, headers).withAttributes(attr).runWith(Sink.ignore)

  def getBucketVersioningSource(
      bucket: String, headers: S3Headers): Source[BucketVersioningResult, NotUsed] =
    s3ManagementRequest[BucketVersioningResult](
      bucket = bucket,
      method = HttpMethods.GET,
      httpRequest = bucketVersioningRequest(bucket, None, headers),
      headers.headersFor(GetBucketVersioning),
      process = { (response: HttpResponse, mat: Materializer) =>
        response match {
          case HttpResponse(status, _, entity, _) if status.isSuccess() =>
            Unmarshal(entity).to[BucketVersioningResult](implicitly, ExecutionContexts.parasitic, mat)
          case response: HttpResponse =>
            unmarshalError(response.status, response.entity)(mat)
        }
      })