in app/services/FileMove/ImprovedLargeFileCopier.scala [222:308]
protected def createRequest(method:HttpMethod, region:Regions, sourceBucket:String, sourceKey:String, sourceVersion:Option[String])
(customiser:((HttpRequest)=>HttpRequest)) =
createRequestFull[Unit](method, region, sourceBucket, sourceKey, sourceVersion, ())(customiser)
/**
* Builds a stream and performs a chunked copy
* @param region AWS region to work in
* @param credentialsProvider AWS credentials provider. If not given then no authentication is used
* @param uploadId multipart upload ID. This must be created with `InitiateMultipartUpload`
* @param parts a list of `UploadPart` instances describing the chunks to copy the file with
* @param metadata HeadInfo object describing the _source_ file
* @param poolClientFlow implicitly provided HostConnectionPool
* @return a Future that completes when all parts are confirmed. If any parts fail then errors are logged and the future fails.
* On success, the Future contains a sequence of `UploadPart` objects which have the etag of the completed part.
* NOTE: these are not necessarily in order!
*/
def sendPartCopies(region:Regions, credentialsProvider:Option[AwsCredentialsProvider], uploadId:String, parts:Seq[UploadPart], metadata:HeadInfo)
(implicit poolClientFlow:HostConnectionPool[UploadPart]) = {
val partCount = parts.length
val coreCount = Runtime.getRuntime.availableProcessors()
val parallelism = if(coreCount>=4) (coreCount/2) - 1 else 1
logger.debug(s"sendPartCopies - paralellism is $parallelism based on $coreCount available processors")
Source.fromIterator(()=>parts.iterator)
.map(uploadPart=>{
logger.info(s"Write s3://${uploadPart.bucket}/${uploadPart.key} part ${uploadPart.partNumber} - building request for ${uploadPart.start}->${uploadPart.end}")
createRequestFull(HttpMethods.PUT, region, uploadPart.bucket, uploadPart.key, None, uploadPart) { partialReq=>
partialReq
.withUri(partialReq.uri.withRawQueryString(s"partNumber=${uploadPart.partNumber}&uploadId=$uploadId"))
.withHeaders(partialReq.headers ++ Seq(
RawHeader("x-amz-copy-source", copySourcePath(metadata.bucket, metadata.key, metadata.version)),
RawHeader("x-amz-copy-source-range", s"bytes=${uploadPart.start}-${uploadPart.end}"),
))
}
})
.mapAsync(parallelism)(reqparts=>{
val uploadPart = reqparts._2
logger.info(s"Write s3://${uploadPart.bucket}/${uploadPart.key} part ${uploadPart.partNumber} - signing request for ${uploadPart.start}->${uploadPart.end}")
doRequestSigning(reqparts._1, region, credentialsProvider).map(req=>(req, reqparts._2))
})
.via(poolClientFlow)
.mapAsyncUnordered(parallelism)({
case (Success(response), uploadPart)=>
logger.info(s"Write s3://${uploadPart.bucket}/${uploadPart.key} part ${uploadPart.partNumber} - received response ${response.status}")
loadResponseBody(response).map(responseBody=>{
(response.status: @switch) match {
case StatusCodes.OK=>
val maybeEtag = for {
xmlContent <- Try { scala.xml.XML.loadString(responseBody) }
etag <- Try { (xmlContent \\ "ETag").text }
} yield etag
maybeEtag match {
case Success(contentEtag)=>
logger.info(s"s3://${metadata.bucket}/${metadata.key}: Uploaded part ${uploadPart.partNumber}/$partCount successfully, etag was $contentEtag.")
Some(UploadedPart(uploadPart.partNumber, uploadId, contentEtag))
case Failure(err)=>
logger.error(s"s3://${metadata.bucket}/${metadata.key}: could not understand XML content. Error was ${err.getMessage}")
logger.error(s"s3://${metadata.bucket}/${metadata.key}: content was $responseBody")
None
}
case _=>
logger.error(s"s3://${metadata.bucket}/${metadata.key}: server returned error ${response.status}")
logger.error(s"s3://${metadata.bucket}/${metadata.key}: $responseBody")
None
}
})
case (Failure(err), _)=>
logger.error(s"s3://${metadata.bucket}/${metadata.key}: could not complete upload part request: ${err.getMessage}", err)
Future(None)
})
.toMat(Sink.seq)(Keep.right)
.run()
.map(results=>{
logger.info(s"s3://${metadata.bucket}/${metadata.key} copied all parts. Received ${results.length} results")
val failures = results.count(_.isEmpty)
logger.info(s"s3://${metadata.bucket}/${metadata.key} $failures /${results.length} errors")
if(failures>0) {
logger.error(s"s3://${metadata.bucket}/${metadata.key}: $failures parts out of ${results.length} failed")
throw new RuntimeException(s"s3://${metadata.bucket}/${metadata.key}: $failures parts out of ${results.length} failed")
} else {
logger.info(s"s3://${metadata.bucket}/${metadata.key}: all results passed")
results.collect({case Some(part)=>part})
}
})
}