in s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala [749:825]
def makeBucket(bucket: String, headers: S3Headers)(implicit mat: Materializer, attr: Attributes): Future[Done] =
makeBucketSource(bucket, headers).withAttributes(attr).runWith(Sink.ignore)
def deleteBucketSource(bucket: String, headers: S3Headers): Source[Done, NotUsed] =
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.DELETE,
httpRequest = bucketManagementRequest(bucket),
headers.headersFor(DeleteBucket),
process = processS3LifecycleResponse)
def deleteBucket(bucket: String, headers: S3Headers)(implicit mat: Materializer, attr: Attributes): Future[Done] =
deleteBucketSource(bucket, headers).withAttributes(attr).runWith(Sink.ignore)
def checkIfBucketExistsSource(bucketName: String, headers: S3Headers): Source[BucketAccess, NotUsed] =
s3ManagementRequest[BucketAccess](
bucket = bucketName,
method = HttpMethods.HEAD,
httpRequest = bucketManagementRequest(bucketName),
headers.headersFor(CheckBucket),
process = processCheckIfExistsResponse)
def checkIfBucketExists(bucket: String, headers: S3Headers)(implicit mat: Materializer,
attr: Attributes): Future[BucketAccess] =
checkIfBucketExistsSource(bucket, headers).withAttributes(attr).runWith(Sink.head)
private def uploadManagementRequest(bucket: String, key: String, uploadId: String)(method: HttpMethod,
conf: S3Settings): HttpRequest =
HttpRequests.uploadManagementRequest(S3Location(bucket, key), uploadId, method)(conf)
def deleteUploadSource(bucket: String, key: String, uploadId: String, headers: S3Headers): Source[Done, NotUsed] =
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.DELETE,
httpRequest = uploadManagementRequest(bucket, key, uploadId),
headers.headersFor(DeleteBucket),
process = processS3LifecycleResponse)
def deleteUpload(bucket: String, key: String, uploadId: String, headers: S3Headers)(implicit mat: Materializer,
attr: Attributes): Future[Done] =
deleteUploadSource(bucket, key, uploadId, headers).withAttributes(attr).runWith(Sink.ignore)
private def bucketVersioningRequest(bucket: String, mfaStatus: Option[MFAStatus], headers: S3Headers)(
method: HttpMethod,
conf: S3Settings): HttpRequest =
HttpRequests.bucketVersioningRequest(bucket, mfaStatus, method, headers.headers)(conf)
def putBucketVersioningSource(
bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers): Source[Done, NotUsed] =
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.PUT,
httpRequest = bucketVersioningRequest(bucket, bucketVersioning.mfaDelete, headers),
headers.headersFor(PutBucketVersioning),
process = processS3LifecycleResponse,
httpEntity = Some(putBucketVersioningPayload(bucketVersioning)(ExecutionContexts.parasitic)))
def putBucketVersioning(bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers)(
implicit mat: Materializer,
attr: Attributes): Future[Done] =
putBucketVersioningSource(bucket, bucketVersioning, headers).withAttributes(attr).runWith(Sink.ignore)
def getBucketVersioningSource(
bucket: String, headers: S3Headers): Source[BucketVersioningResult, NotUsed] =
s3ManagementRequest[BucketVersioningResult](
bucket = bucket,
method = HttpMethods.GET,
httpRequest = bucketVersioningRequest(bucket, None, headers),
headers.headersFor(GetBucketVersioning),
process = { (response: HttpResponse, mat: Materializer) =>
response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() =>
Unmarshal(entity).to[BucketVersioningResult](implicitly, ExecutionContexts.parasitic, mat)
case response: HttpResponse =>
unmarshalError(response.status, response.entity)(mat)
}
})