in app/services/FileMove/ImprovedLargeFileCopier.scala [383:444]
private def loadResponseBody(response:HttpResponse) = response.entity
.dataBytes
.runWith(Sink.reduce[ByteString](_ ++ _))
.map(_.utf8String)
/**
* Initiates a multipart upload to the given object.
* @param region AWS region within which to operate
* @param credentialsProvider AWS Credentials provider. This can be None, if so then no authentication takes place and the request is made anonymously
* @param destBucket bucket to create the object in
* @param destKey key to create the object with
* @param metadata HeadInfo providing the content type metadata to use
* @param poolClientFlow implicitly provided HostConnectionPool instance
* @return a Future containing the upload ID. On error, the future will be failed
*/
def initiateMultipartUpload(region:Regions, credentialsProvider:Option[AwsCredentialsProvider], destBucket:String, destKey:String, metadata:HeadInfo)
(implicit poolClientFlow:HostConnectionPool[Any]) = {
val req = createRequest(HttpMethods.POST, region, destBucket, destKey, None) { partialRequest=>
val contentType = ContentType.parse(metadata.contentType) match {
case Right(ct)=>ct
case Left(errs)=>
logger.warn(s"S3 provided content-type '${metadata.contentType}' was not acceptable to Akka: ${errs.mkString(";")}'")
ContentTypes.`application/octet-stream`
}
partialRequest
.withUri(partialRequest.uri.withRawQueryString("uploads"))
.withHeaders(partialRequest.headers ++ Seq(
RawHeader("x-amz-acl", "private"),
))
.withEntity(HttpEntity.empty(contentType))
}
Source
.single(req)
.mapAsync(1)(reqparts=>doRequestSigning(reqparts._1, region, credentialsProvider).map(rq=>(rq, ())))
.via(poolClientFlow)
.runWith(Sink.head)
.flatMap({
case (Success(response), _)=>
(response.status: @switch) match {
case StatusCodes.OK =>
loadResponseBody(response)
.map(scala.xml.XML.loadString)
.map(elems => (elems \\ "UploadId").text)
.map(uploadId => {
logger.info(s"Successfully initiated a multipart upload with ID $uploadId")
uploadId
})
case _=>
loadResponseBody(response)
.flatMap(errContent=> {
logger.error(s"Could not initiate multipart upload for s3://$destBucket/$destKey: ${response.status}")
logger.error(s"s3://$destBucket/$destKey: $errContent")
Future.failed(new RuntimeException(s"Server error ${response.status}"))
})
}
case (Failure(err), _) =>
logger.error(s"Could not initate multipart upload for s3://$destBucket/$destKey: ${err.getMessage}", err)
Future.failed(err)
})
}