in app/services/FileMove/ImprovedLargeFileCopier.scala [457:499]
def completeMultipartUpload(region:Regions, credentialsProvider:Option[AwsCredentialsProvider], destBucket:String, destKey:String, uploadId:String, parts:Seq[UploadedPart])
(implicit poolClientFlow:HostConnectionPool[Any]) = {
val xmlContent = <CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
{
parts.sortBy(_.partNumber).map(_.toXml) //must put the parts into ascending order
}
</CompleteMultipartUpload>
val req = createRequest(HttpMethods.POST, region, destBucket, destKey, None) { partialReq=>
partialReq
.withUri(partialReq.uri.withRawQueryString(s"uploadId=$uploadId"))
.withEntity(HttpEntity(xmlContent.toString()))
}
Source
.single(req)
.mapAsync(1)(reqparts=>doRequestSigning(reqparts._1, region, credentialsProvider).map(rq=>(rq, ())))
.via(poolClientFlow)
.runWith(Sink.head)
.flatMap({
case (Success(response), _)=>
loadResponseBody(response).map(content=>{
(response.status: @switch) match {
case StatusCodes.OK=>
CompletedUpload.fromXMLString(content) match {
case Success(completedUpload)=>
logger.info(s"Copy to s3://$destBucket/$destKey completed with eTag ${completedUpload.eTag}")
completedUpload
case Failure(err)=>
logger.error(s"Copy to s3://$destBucket/$destKey completed but could not parse the response: ${err.getMessage}")
logger.error(s"s3://$destBucket/$destKey raw response was $content")
throw new RuntimeException("Could not parse completed-upload response")
}
case _=>
logger.error(s"Copy to s3://$destBucket/$destKey failed with error ${response.status}: $content")
throw new RuntimeException(s"Server error ${response.status}")
}
})
case (Failure(err), _)=>
logger.error(s"Could not complete copy to s3://$destBucket/$destKey: ${err.getMessage}", err)
Future.failed(err)
})
}