protected def createRequest()

in app/services/FileMove/ImprovedLargeFileCopier.scala [222:308]


  protected def createRequest(method:HttpMethod, region:Regions, sourceBucket:String, sourceKey:String, sourceVersion:Option[String])
                             (customiser:((HttpRequest)=>HttpRequest)) =
    createRequestFull[Unit](method, region, sourceBucket, sourceKey, sourceVersion, ())(customiser)

  /**
    * Builds a stream and performs a chunked copy
    * @param region AWS region to work in
    * @param credentialsProvider AWS credentials provider. If not given then no authentication is used
    * @param uploadId multipart upload ID. This must be created with `InitiateMultipartUpload`
    * @param parts a list of `UploadPart` instances describing the chunks to copy the file with
    * @param metadata HeadInfo object describing the _source_ file
    * @param poolClientFlow implicitly provided HostConnectionPool
    * @return a Future that completes when all parts are confirmed. If any parts fail then errors are logged and the future fails.
    *         On success, the Future contains a sequence of `UploadPart` objects which have the etag of the completed part.
    *         NOTE: these are not necessarily in order!
    */
  def sendPartCopies(region:Regions, credentialsProvider:Option[AwsCredentialsProvider], uploadId:String, parts:Seq[UploadPart], metadata:HeadInfo)
                 (implicit poolClientFlow:HostConnectionPool[UploadPart]) = {
    val partCount = parts.length

    val coreCount = Runtime.getRuntime.availableProcessors()
    val parallelism = if(coreCount>=4) (coreCount/2) - 1 else 1
    logger.debug(s"sendPartCopies - paralellism is $parallelism based on $coreCount available processors")

    Source.fromIterator(()=>parts.iterator)
      .map(uploadPart=>{
        logger.info(s"Write s3://${uploadPart.bucket}/${uploadPart.key} part ${uploadPart.partNumber} - building request for ${uploadPart.start}->${uploadPart.end}")
        createRequestFull(HttpMethods.PUT, region, uploadPart.bucket, uploadPart.key, None, uploadPart) { partialReq=>
          partialReq
          .withUri(partialReq.uri.withRawQueryString(s"partNumber=${uploadPart.partNumber}&uploadId=$uploadId"))
          .withHeaders(partialReq.headers ++ Seq(
            RawHeader("x-amz-copy-source", copySourcePath(metadata.bucket, metadata.key, metadata.version)),
            RawHeader("x-amz-copy-source-range", s"bytes=${uploadPart.start}-${uploadPart.end}"),
          ))
        }
      })
      .mapAsync(parallelism)(reqparts=>{
        val uploadPart = reqparts._2
        logger.info(s"Write s3://${uploadPart.bucket}/${uploadPart.key} part ${uploadPart.partNumber} - signing request for ${uploadPart.start}->${uploadPart.end}")
        doRequestSigning(reqparts._1, region, credentialsProvider).map(req=>(req, reqparts._2))
      })
      .via(poolClientFlow)
      .mapAsyncUnordered(parallelism)({
        case (Success(response), uploadPart)=>
          logger.info(s"Write s3://${uploadPart.bucket}/${uploadPart.key} part ${uploadPart.partNumber} - received response ${response.status}")
          loadResponseBody(response).map(responseBody=>{
            (response.status: @switch) match {
              case StatusCodes.OK=>
                val maybeEtag = for {
                  xmlContent <- Try { scala.xml.XML.loadString(responseBody) }
                  etag <- Try { (xmlContent \\ "ETag").text }
                } yield etag

                maybeEtag match {
                  case Success(contentEtag)=>
                    logger.info(s"s3://${metadata.bucket}/${metadata.key}: Uploaded part ${uploadPart.partNumber}/$partCount successfully, etag was $contentEtag.")
                    Some(UploadedPart(uploadPart.partNumber, uploadId, contentEtag))
                  case Failure(err)=>
                    logger.error(s"s3://${metadata.bucket}/${metadata.key}: could not understand XML content. Error was ${err.getMessage}")
                    logger.error(s"s3://${metadata.bucket}/${metadata.key}: content was $responseBody")
                    None
                }
              case _=>
                logger.error(s"s3://${metadata.bucket}/${metadata.key}: server returned error ${response.status}")
                logger.error(s"s3://${metadata.bucket}/${metadata.key}: $responseBody")
                None
            }
          })
        case (Failure(err), _)=>
          logger.error(s"s3://${metadata.bucket}/${metadata.key}: could not complete upload part request: ${err.getMessage}", err)
          Future(None)
      })
      .toMat(Sink.seq)(Keep.right)
      .run()
      .map(results=>{
        logger.info(s"s3://${metadata.bucket}/${metadata.key} copied all parts.  Received ${results.length} results")
        val failures = results.count(_.isEmpty)
        logger.info(s"s3://${metadata.bucket}/${metadata.key} $failures /${results.length} errors")
        if(failures>0) {
          logger.error(s"s3://${metadata.bucket}/${metadata.key}: $failures parts out of ${results.length} failed")
          throw new RuntimeException(s"s3://${metadata.bucket}/${metadata.key}: $failures parts out of ${results.length} failed")
        } else {
          logger.info(s"s3://${metadata.bucket}/${metadata.key}: all results passed")
          results.collect({case Some(part)=>part})
        }
      })
  }