app/services/FileMove/ImprovedLargeFileCopier.scala (443 lines of code) (raw):
package services.FileMove
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.{Host, RawHeader}
import akka.http.scaladsl.model.{ContentType, ContentTypes, HttpEntity, HttpHeader, HttpMethod, HttpMethods, HttpRequest, HttpResponse, StatusCode, StatusCodes}
import akka.stream.{KillSwitches, Materializer}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString
import com.amazonaws.regions.{Region, Regions}
import helpers.S3Signer
import org.slf4j.LoggerFactory
import services.FileMove.ImprovedLargeFileCopier.{CompletedUpload, HeadInfo, UploadPart, UploadedPart, copySourcePath}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import java.net.{URI, URLEncoder}
import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import javax.inject.{Inject, Singleton}
import scala.annotation.switch
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
object ImprovedLargeFileCopier {
private val logger = LoggerFactory.getLogger(getClass)
def NoRequestCustomisation(httpRequest: HttpRequest) = httpRequest
val defaultPartSize:Long = 10*1024*1024 //default chunk size is 10Mb
/**
* AWS specifications say that parts must be at least 5Mb in size but no more than 5Gb in size, and that there
* must be a maximum of 10,000 parts for an upload. This makes the effective file size limit 5Tb
* @param totalFileSize actual size of the file to upload, in bytes
* @return the target path size
*/
def estimatePartSize(totalFileSize:Long):Long = {
val maxWantedParts = 10000
var partSize:Long = defaultPartSize
var nParts:Int = maxWantedParts + 1
var i:Int=1
while(true) {
nParts = Math.ceil(totalFileSize.toDouble / partSize.toDouble).toInt
if (nParts > maxWantedParts) {
i = i + 1
partSize = defaultPartSize * i
} else {
logger.info(s"Part size estimated at $partSize for $nParts target parts")
return partSize
}
}
defaultPartSize
}
/**
* returns a path suitable for the `x-amz-copy-source` header. This is URL-encoded.
*/
def copySourcePath(bucket:String, key:String, version:Option[String]) = {
URLEncoder.encode(s"$bucket/$key${version.map(v=>s"?versionId=$v").getOrElse("")}", StandardCharsets.UTF_8)
}
case class HeadInfo(
bucket:String,
key:String,
version:Option[String],
lastModified:String,
contentLength:Long,
eTag:Option[String],
contentType:String,
contentEncoding:Option[String],
contentLanguage:Option[String],
)
object HeadInfo {
/**
* Build a HeadInfo object from provided HTTP headers. The resulting HeadInfo is NOT validated.
* @param bucket
* @param key
* @param version
* @param headers
* @return
*/
def apply(bucket:String, key:String, version:Option[String], headers:Seq[HttpHeader], entityContentType:ContentType, entityContentLength:Option[Long]) = {
def updateHeadInfo(existing:HeadInfo, nextHeader:HttpHeader) = (nextHeader.name(): @switch) match {
case "Last-Modified"=>existing.copy(lastModified=nextHeader.value())
case "Content-Length"=>existing.copy(contentLength=Try { nextHeader.value().toLong}.toOption.getOrElse(-1L))
case "ETag"=>existing.copy(eTag=Some(nextHeader.value()))
case "Content-Type"=>existing.copy(contentType=nextHeader.value())
case "Content-Encoding"=>existing.copy(contentEncoding=Some(nextHeader.value()))
case "Content-Language"=>existing.copy(contentLanguage=Some(nextHeader.value()))
case _=>existing
}
val initial = new HeadInfo(bucket, key, version, "", -1L, None, "", None,None)
val fromHeaders = headers.foldLeft(initial)((acc,elem)=>updateHeadInfo(acc, elem))
val withContentType = if(fromHeaders.contentType=="") {
fromHeaders.copy(contentType = entityContentType.toString())
} else {
fromHeaders
}
if(withContentType.contentLength == -1L && entityContentLength.isDefined) {
withContentType.copy(contentLength = entityContentLength.get)
} else {
withContentType
}
}
}
case class CompletedUpload(location:String, bucket:String, key:String, eTag:String, crc32:Option[String], crc32c:Option[String], sha1:Option[String], sha256:Option[String])
object CompletedUpload {
def fromXMLString(xmlString:String) = {
for {
parsed <- Try { scala.xml.XML.loadString(xmlString) }
result <- Try {
new CompletedUpload(
(parsed \\ "Location").text,
(parsed \\ "Bucket").text,
(parsed \\ "Key").text,
(parsed \\ "ETag").text,
(parsed \\ "ChecksumCRC32").headOption.map(_.text),
(parsed \\ "ChecksumCRC32C").headOption.map(_.text),
(parsed \\ "ChecksumSHA1").headOption.map(_.text),
(parsed \\ "ChecksumSHA256").headOption.map(_.text),
)
}
} yield result
}
}
case class UploadPart(bucket:String, key:String, start:Long, end:Long, partNumber:Int)
case class UploadedPart(partNumber:Int, uploadId:String, uploadedETag:String) {
/**
* returns an XML element for inclusion in the CompleteMultipartUpload request
* @return
*/
def toXml = <Part>
<ETag>{uploadedETag}</ETag>
<PartNumber>{partNumber}</PartNumber>
</Part>
}
private val etagPartsXtractor = "\"*.*-(\\d+)\"*$".r
def partsFromEtag(etag:String) = etag match {
case etagPartsXtractor(partCount)=>
val partCountInt = partCount.toInt //this should be safe because the regex ensures that partCount is only digits
logger.debug(s"etag of $etag gives partCount of $partCountInt")
Some(partCountInt)
case _=>
logger.error(s"Could not get parts count from etag $etag")
None
}
/**
* builds a list of UploadPart instances corresponding to the given file
* @param metadata HeadInfo describing the _source_ file
* @return a sequence of UploadPart instances, representing the start and end points of each chunk of the upload
*/
def deriveParts(destBucket:String, destKey:String, metadata:HeadInfo):Seq[UploadPart] = {
val partSize = estimatePartSize(metadata.contentLength)
var ptr: Long = 0
var ctr: Int = 1 //partNumber starts from 1 according to the AWS spec
var output = scala.collection.mutable.ListBuffer[UploadPart]()
while (ptr < metadata.contentLength) {
val chunkEnd = if (ptr + partSize > metadata.contentLength) {
metadata.contentLength - 1 //range is zero-based so the last byte is contentLength-1
} else {
ptr + partSize - 1
}
output = output :+ UploadPart(destBucket, destKey, ptr, chunkEnd, ctr)
ctr += 1
ptr += partSize
}
logger.info(s"s3://${destBucket}/${destKey} - ${output.length} parts of ${partSize} bytes each")
output.toSeq
}
}
/**
* Uses the "multipart-copy" AWS API directly via an Akka connection pool
*/
@Singleton
class ImprovedLargeFileCopier @Inject() (implicit actorSystem:ActorSystem, override val mat:Materializer, override val ec:ExecutionContext) extends S3Signer {
override protected val logger = LoggerFactory.getLogger(getClass)
//Full type of "poolClientFlow" to save on typing :)
type HostConnectionPool[T] = Flow[(HttpRequest, T), (Try[HttpResponse], T), Http.HostConnectionPool]
def makeS3Uri(region:Regions, bucket:String, key:String, maybeVersion:Option[String]) = {
val bucketAndKey = Paths
.get("/", bucket, key)
.toString
logger.debug(s"makeS3Uri - URL path is $bucketAndKey")
new URI("https", s"s3.${region.getName}.amazonaws.com", bucketAndKey, maybeVersion.map(v => s"versionId=$v").orNull, null).toASCIIString
}
/**
* Creates a generic Akka HttpRequest object with the given parameters. If further customisation is required, provide
* a callback which will take the built HttpRequest and change it, returning the updated copy. If not, then pass
* ImprovedLargeFileCopier.NoRequestCustomisation as the callback.
* This is used in a graph to provide an input to the poolClientFlow.
* @param method HTTP method
* @param region AWS region to talk to S3
* @param sourceBucket source bucket name
* @param sourceKey source key
* @param sourceVersion optional, version of the source file. Specify None if not using versioning or to default to latest
* @param customiser callback function that can be used to update the request model and return it
* @return a tuple of HttpRequest and Unit
*/
protected def createRequestFull[T](method:HttpMethod, region:Regions, sourceBucket:String, sourceKey:String, sourceVersion:Option[String], extraData:T)
(customiser:((HttpRequest)=>HttpRequest)) = (
{
val targetUri = makeS3Uri(region, sourceBucket, sourceKey, sourceVersion)
logger.debug(s"Target URI is $targetUri")
customiser(HttpRequest(method, uri=targetUri, headers=Seq()))
},
extraData
)
protected def createRequest(method:HttpMethod, region:Regions, sourceBucket:String, sourceKey:String, sourceVersion:Option[String])
(customiser:((HttpRequest)=>HttpRequest)) =
createRequestFull[Unit](method, region, sourceBucket, sourceKey, sourceVersion, ())(customiser)
/**
* Builds a stream and performs a chunked copy
* @param region AWS region to work in
* @param credentialsProvider AWS credentials provider. If not given then no authentication is used
* @param uploadId multipart upload ID. This must be created with `InitiateMultipartUpload`
* @param parts a list of `UploadPart` instances describing the chunks to copy the file with
* @param metadata HeadInfo object describing the _source_ file
* @param poolClientFlow implicitly provided HostConnectionPool
* @return a Future that completes when all parts are confirmed. If any parts fail then errors are logged and the future fails.
* On success, the Future contains a sequence of `UploadPart` objects which have the etag of the completed part.
* NOTE: these are not necessarily in order!
*/
def sendPartCopies(region:Regions, credentialsProvider:Option[AwsCredentialsProvider], uploadId:String, parts:Seq[UploadPart], metadata:HeadInfo)
(implicit poolClientFlow:HostConnectionPool[UploadPart]) = {
val partCount = parts.length
val coreCount = Runtime.getRuntime.availableProcessors()
val parallelism = if(coreCount>=4) (coreCount/2) - 1 else 1
logger.debug(s"sendPartCopies - paralellism is $parallelism based on $coreCount available processors")
Source.fromIterator(()=>parts.iterator)
.map(uploadPart=>{
logger.info(s"Write s3://${uploadPart.bucket}/${uploadPart.key} part ${uploadPart.partNumber} - building request for ${uploadPart.start}->${uploadPart.end}")
createRequestFull(HttpMethods.PUT, region, uploadPart.bucket, uploadPart.key, None, uploadPart) { partialReq=>
partialReq
.withUri(partialReq.uri.withRawQueryString(s"partNumber=${uploadPart.partNumber}&uploadId=$uploadId"))
.withHeaders(partialReq.headers ++ Seq(
RawHeader("x-amz-copy-source", copySourcePath(metadata.bucket, metadata.key, metadata.version)),
RawHeader("x-amz-copy-source-range", s"bytes=${uploadPart.start}-${uploadPart.end}"),
))
}
})
.mapAsync(parallelism)(reqparts=>{
val uploadPart = reqparts._2
logger.info(s"Write s3://${uploadPart.bucket}/${uploadPart.key} part ${uploadPart.partNumber} - signing request for ${uploadPart.start}->${uploadPart.end}")
doRequestSigning(reqparts._1, region, credentialsProvider).map(req=>(req, reqparts._2))
})
.via(poolClientFlow)
.mapAsyncUnordered(parallelism)({
case (Success(response), uploadPart)=>
logger.info(s"Write s3://${uploadPart.bucket}/${uploadPart.key} part ${uploadPart.partNumber} - received response ${response.status}")
loadResponseBody(response).map(responseBody=>{
(response.status: @switch) match {
case StatusCodes.OK=>
val maybeEtag = for {
xmlContent <- Try { scala.xml.XML.loadString(responseBody) }
etag <- Try { (xmlContent \\ "ETag").text }
} yield etag
maybeEtag match {
case Success(contentEtag)=>
logger.info(s"s3://${metadata.bucket}/${metadata.key}: Uploaded part ${uploadPart.partNumber}/$partCount successfully, etag was $contentEtag.")
Some(UploadedPart(uploadPart.partNumber, uploadId, contentEtag))
case Failure(err)=>
logger.error(s"s3://${metadata.bucket}/${metadata.key}: could not understand XML content. Error was ${err.getMessage}")
logger.error(s"s3://${metadata.bucket}/${metadata.key}: content was $responseBody")
None
}
case _=>
logger.error(s"s3://${metadata.bucket}/${metadata.key}: server returned error ${response.status}")
logger.error(s"s3://${metadata.bucket}/${metadata.key}: $responseBody")
None
}
})
case (Failure(err), _)=>
logger.error(s"s3://${metadata.bucket}/${metadata.key}: could not complete upload part request: ${err.getMessage}", err)
Future(None)
})
.toMat(Sink.seq)(Keep.right)
.run()
.map(results=>{
logger.info(s"s3://${metadata.bucket}/${metadata.key} copied all parts. Received ${results.length} results")
val failures = results.count(_.isEmpty)
logger.info(s"s3://${metadata.bucket}/${metadata.key} $failures /${results.length} errors")
if(failures>0) {
logger.error(s"s3://${metadata.bucket}/${metadata.key}: $failures parts out of ${results.length} failed")
throw new RuntimeException(s"s3://${metadata.bucket}/${metadata.key}: $failures parts out of ${results.length} failed")
} else {
logger.info(s"s3://${metadata.bucket}/${metadata.key}: all results passed")
results.collect({case Some(part)=>part})
}
})
}
private def doRequestSigning(req:HttpRequest, region:Regions, credentialsProvider:Option[AwsCredentialsProvider]) =
credentialsProvider match {
case Some(creds)=>
logger.info(s"Signing request from ${creds.resolveCredentials().accessKeyId()}")
val reqWithHost = req.withHeaders(req.headers :+ Host(req.uri.authority))
signHttpRequest(reqWithHost, Region.getRegion(region), "s3", creds)
case None=>
logger.warn(s"No credentials provider, attempting un-authenticated access")
Future(req)
}
/**
* Gets the metadata for a given object
* @param region
* @param credentialsProvider
* @param sourceBucket
* @param sourceKey
* @param sourceVersion
* @param poolClientFlow
* @return a Future containing None if the object does not exist, an instance of HeadInfo if it does or a failure if
* another error occurred
*/
def headSourceFile(region:Regions, credentialsProvider: Option[AwsCredentialsProvider], sourceBucket: String, sourceKey:String, sourceVersion:Option[String])
(implicit poolClientFlow:HostConnectionPool[Any]) = {
val req = createRequest(HttpMethods.HEAD, region, sourceBucket, sourceKey, sourceVersion)(ImprovedLargeFileCopier.NoRequestCustomisation)
Source
.single(req)
.mapAsync(1)(reqparts=>doRequestSigning(reqparts._1, region, credentialsProvider).map(req=>(req, reqparts._2)))
.map(reqparts=>{
logger.info(s"Request to send has ${reqparts._1.headers.length} headers. Signed request headers:")
reqparts._1.headers.foreach(hdr=>logger.info(s"\t${hdr.name()}: ${hdr.value()}"))
reqparts
})
.via(poolClientFlow)
.runWith(Sink.head)
.map({
case (Success(result), _) =>
(result.status: @switch) match {
case StatusCodes.OK =>
logger.info(s"HEAD success on $sourceBucket/$sourceKey@${sourceVersion.getOrElse("LATEST")}")
result.headers.foreach(hdr => {
logger.debug(s"\t${hdr.name()} => ${hdr.value()}")
})
result.discardEntityBytes()
Some(ImprovedLargeFileCopier.HeadInfo(sourceBucket, sourceKey, sourceVersion,
result.headers, result.entity.contentType, result.entity.contentLengthOption)
)
case StatusCodes.NotFound=>
result.discardEntityBytes()
logger.warn(s"No file found for $sourceBucket/$sourceKey@${sourceVersion.getOrElse("LATEST")}")
None
case StatusCodes.Forbidden=>
val contentFut = result
.entity
.dataBytes
.runWith(Sink.fold(ByteString.empty)(_ ++ _))
//only for debugging!!
val content = Await.result(contentFut, 10.seconds)
logger.error(s"Access was forbidden to $sourceBucket/$sourceKey@${sourceVersion.getOrElse("LATEST")}: '${content.utf8String}'")
throw new RuntimeException("Access forbidden")
case _=>
result.discardEntityBytes()
logger.error(s"Unexpected response from S3 in HEAD operation: ${result.status.value}")
throw new RuntimeException(s"Unexpected response from S3: ${result.status.value}")
}
case (Failure(err), _) =>
logger.error(s"Could not retrieve metadata for s3://$sourceBucket/$sourceKey@${sourceVersion.getOrElse("LATEST")}:" +
s" ${err.getMessage}", err)
throw err //fail the future
})
}
private def loadResponseBody(response:HttpResponse) = response.entity
.dataBytes
.runWith(Sink.reduce[ByteString](_ ++ _))
.map(_.utf8String)
/**
* Initiates a multipart upload to the given object.
* @param region AWS region within which to operate
* @param credentialsProvider AWS Credentials provider. This can be None, if so then no authentication takes place and the request is made anonymously
* @param destBucket bucket to create the object in
* @param destKey key to create the object with
* @param metadata HeadInfo providing the content type metadata to use
* @param poolClientFlow implicitly provided HostConnectionPool instance
* @return a Future containing the upload ID. On error, the future will be failed
*/
def initiateMultipartUpload(region:Regions, credentialsProvider:Option[AwsCredentialsProvider], destBucket:String, destKey:String, metadata:HeadInfo)
(implicit poolClientFlow:HostConnectionPool[Any]) = {
val req = createRequest(HttpMethods.POST, region, destBucket, destKey, None) { partialRequest=>
val contentType = ContentType.parse(metadata.contentType) match {
case Right(ct)=>ct
case Left(errs)=>
logger.warn(s"S3 provided content-type '${metadata.contentType}' was not acceptable to Akka: ${errs.mkString(";")}'")
ContentTypes.`application/octet-stream`
}
partialRequest
.withUri(partialRequest.uri.withRawQueryString("uploads"))
.withHeaders(partialRequest.headers ++ Seq(
RawHeader("x-amz-acl", "private"),
))
.withEntity(HttpEntity.empty(contentType))
}
Source
.single(req)
.mapAsync(1)(reqparts=>doRequestSigning(reqparts._1, region, credentialsProvider).map(rq=>(rq, ())))
.via(poolClientFlow)
.runWith(Sink.head)
.flatMap({
case (Success(response), _)=>
(response.status: @switch) match {
case StatusCodes.OK =>
loadResponseBody(response)
.map(scala.xml.XML.loadString)
.map(elems => (elems \\ "UploadId").text)
.map(uploadId => {
logger.info(s"Successfully initiated a multipart upload with ID $uploadId")
uploadId
})
case _=>
loadResponseBody(response)
.flatMap(errContent=> {
logger.error(s"Could not initiate multipart upload for s3://$destBucket/$destKey: ${response.status}")
logger.error(s"s3://$destBucket/$destKey: $errContent")
Future.failed(new RuntimeException(s"Server error ${response.status}"))
})
}
case (Failure(err), _) =>
logger.error(s"Could not initate multipart upload for s3://$destBucket/$destKey: ${err.getMessage}", err)
Future.failed(err)
})
}
/**
* completes an in-progress multipart upload. This tells S3 to combine all the parts into one file and verify it.
* @param region AWS region within which to operate
* @param credentialsProvider AWS credentials provider for authentication. If None then no authentication is performed.
* @param destBucket bucket that is being written to
* @param destKey file that is being written to
* @param uploadId upload ID of the in-progress upload
* @param parts a list of `UploadedPart` giving details of the individual part copies
* @param poolClientFlow implicitly provided HostConnectionPool instance
* @return a Future containing a CompletedUpload instance. On error, the future will be failed.
*/
def completeMultipartUpload(region:Regions, credentialsProvider:Option[AwsCredentialsProvider], destBucket:String, destKey:String, uploadId:String, parts:Seq[UploadedPart])
(implicit poolClientFlow:HostConnectionPool[Any]) = {
val xmlContent = <CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
{
parts.sortBy(_.partNumber).map(_.toXml) //must put the parts into ascending order
}
</CompleteMultipartUpload>
val req = createRequest(HttpMethods.POST, region, destBucket, destKey, None) { partialReq=>
partialReq
.withUri(partialReq.uri.withRawQueryString(s"uploadId=$uploadId"))
.withEntity(HttpEntity(xmlContent.toString()))
}
Source
.single(req)
.mapAsync(1)(reqparts=>doRequestSigning(reqparts._1, region, credentialsProvider).map(rq=>(rq, ())))
.via(poolClientFlow)
.runWith(Sink.head)
.flatMap({
case (Success(response), _)=>
loadResponseBody(response).map(content=>{
(response.status: @switch) match {
case StatusCodes.OK=>
CompletedUpload.fromXMLString(content) match {
case Success(completedUpload)=>
logger.info(s"Copy to s3://$destBucket/$destKey completed with eTag ${completedUpload.eTag}")
completedUpload
case Failure(err)=>
logger.error(s"Copy to s3://$destBucket/$destKey completed but could not parse the response: ${err.getMessage}")
logger.error(s"s3://$destBucket/$destKey raw response was $content")
throw new RuntimeException("Could not parse completed-upload response")
}
case _=>
logger.error(s"Copy to s3://$destBucket/$destKey failed with error ${response.status}: $content")
throw new RuntimeException(s"Server error ${response.status}")
}
})
case (Failure(err), _)=>
logger.error(s"Could not complete copy to s3://$destBucket/$destKey: ${err.getMessage}", err)
Future.failed(err)
})
}
/**
* Cancels an in-progress multpart upload. This should always be called when aborting to minimise charges
* @param region AWS region to operate in
* @param credentialsProvider AWS credentials provider. If None the request is attempted without authentication
* @param sourceBucket bucket containing the file being upload
* @param sourceKey path to the file being uploaded
* @param uploadId multipart upload ID
* @param poolClientFlow implicitly provided HostConnectionPool
* @return a Future with no value. On error, the Future will fail.
*/
def abortMultipartUpload(region:Regions, credentialsProvider:Option[AwsCredentialsProvider], sourceBucket:String, sourceKey:String, uploadId:String)
(implicit poolClientFlow:HostConnectionPool[Any])= {
logger.info(s"Aborting multipart upload s3://$sourceBucket/$sourceKey with ID $uploadId")
Source
.single(createRequest(HttpMethods.DELETE, region, sourceBucket, sourceKey, None) { partialRequest=>
partialRequest.withUri(
partialRequest.uri
.withRawQueryString(s"uploadId=${URLEncoder.encode(uploadId, StandardCharsets.UTF_8)}")
)
})
.mapAsync(1)(reqparts=>doRequestSigning(reqparts._1, region, credentialsProvider).map(rq=>(rq, ())))
.via(poolClientFlow)
.runWith(Sink.head)
.flatMap({
case (Success(response), _)=>
(response.status: @switch) match {
case StatusCodes.OK=>
response.discardEntityBytes()
logger.info(s"Successfully cancelled upload ID $uploadId")
Future( () )
case _=>
loadResponseBody(response).map(errContent=>{
logger.error(s"Could not cancel multipart upload $uploadId: server said ${response.status} $errContent")
throw new RuntimeException(s"Server error ${response.status}")
})
}
})
}
/**
* Creates a new pool client flow. This should be considered internal and is only used externally in testing.
* @param destRegion region to communicate with
* @return
*/
def newPoolClientFlow[T](destRegion:Regions) = {
Http().cachedHostConnectionPoolHttps[T](s"s3.${destRegion.getName}.amazonaws.com")
}
/**
* Performs a multipart copy operation
* @param destRegion Region to operate in. This method does not currently support cross-region copying
* @param credentialsProvider AwsCredentialsProvider used for signing requests. If this is None then the requests are attempted unauthenticated
* @param sourceBucket bucket to copy from
* @param sourceKey file to copy from
* @param sourceVersion optional version of the file to copy from
* @param destBucket bucket to copy to
* @param destKey file to copy to. You can't specify a specific version, if versioning is enabled on destBucket then a version
* is created and if not an existing file is overwritten
* @return a Future, containing a CompletedUpload object. On error, the partial upload is aborted and a failed future is returned.
*/
def performCopy(destRegion:Regions, credentialsProvider: Option[AwsCredentialsProvider], sourceBucket:String, sourceKey:String,
sourceVersion:Option[String], destBucket:String, destKey:String) = {
logger.info(s"Initiating ImprovedLargeFileCopier for region $destRegion")
implicit val poolClientFlow = newPoolClientFlow[UploadPart](destRegion)
implicit val genericPoolFlow = poolClientFlow.asInstanceOf[HostConnectionPool[Any]]
logger.info(s"Looking up s3://${sourceBucket}/$sourceKey@$sourceVersion in $destRegion")
headSourceFile(destRegion, credentialsProvider, sourceBucket, sourceKey, sourceVersion).flatMap({
case Some(headInfo)=>
logger.info(s"Got header info for s3://${sourceBucket}/$sourceKey@$sourceVersion in $destRegion")
val parts = ImprovedLargeFileCopier.deriveParts(destBucket, destKey, headInfo)
logger.info(s"s3://${sourceBucket}/$sourceKey@$sourceVersion - ${parts.length} parts")
initiateMultipartUpload(destRegion, credentialsProvider, destBucket, destKey, headInfo)
.flatMap(uploadId=>{
logger.info(s"s3://${sourceBucket}/$sourceKey@$sourceVersion - initiated MP upload to s3://$destBucket/$destKey with ID $uploadId")
sendPartCopies(destRegion, credentialsProvider, uploadId, parts, headInfo)
.flatMap(completedParts=>{
logger.info(s"${sourceBucket}/$sourceKey@$sourceVersion - Copied all parts, completing the upload")
//send the complete-upload confirmation
completeMultipartUpload(destRegion, credentialsProvider, destBucket, destKey, uploadId, completedParts)
})
.recoverWith({
case err:Throwable=> //if any error occurs ensure that the upload is aborted
logger.error(s"Copy to s3://$destBucket/$destKey failed: ${err.getMessage}", err)
abortMultipartUpload(destRegion, credentialsProvider, headInfo.bucket, headInfo.key, uploadId)
.flatMap(_=>Future.failed(err))
})
.map(completed=>{
logger.info(s"${sourceBucket}/$sourceKey@$sourceVersion - completed upload with location ${completed.location}, etag ${completed.eTag}")
completed
})
})
case None=>
logger.error(s"Can't copy s3://$sourceBucket/$sourceKey@${sourceVersion.getOrElse("LATEST")} because the source file does not exist")
Future.failed(new RuntimeException(s"Source file did not exist"))
})
}
}