private def loadResponseBody()

in app/services/FileMove/ImprovedLargeFileCopier.scala [383:444]


  private def loadResponseBody(response:HttpResponse) = response.entity
    .dataBytes
    .runWith(Sink.reduce[ByteString](_ ++ _))
    .map(_.utf8String)

  /**
    * Initiates a multipart upload to the given object.
    * @param region AWS region within which to operate
    * @param credentialsProvider AWS Credentials provider. This can be None, if so then no authentication takes place and the request is made anonymously
    * @param destBucket bucket to create the object in
    * @param destKey key to create the object with
    * @param metadata HeadInfo providing the content type metadata to use
    * @param poolClientFlow implicitly provided HostConnectionPool instance
    * @return a Future containing the upload ID. On error, the future will be failed
    */
  def initiateMultipartUpload(region:Regions, credentialsProvider:Option[AwsCredentialsProvider], destBucket:String, destKey:String, metadata:HeadInfo)
                             (implicit poolClientFlow:HostConnectionPool[Any]) = {
    val req = createRequest(HttpMethods.POST, region, destBucket, destKey, None) { partialRequest=>
      val contentType = ContentType.parse(metadata.contentType) match {
        case Right(ct)=>ct
        case Left(errs)=>
          logger.warn(s"S3 provided content-type '${metadata.contentType}' was not acceptable to Akka: ${errs.mkString(";")}'")
          ContentTypes.`application/octet-stream`
      }

      partialRequest
        .withUri(partialRequest.uri.withRawQueryString("uploads"))
        .withHeaders(partialRequest.headers ++ Seq(
          RawHeader("x-amz-acl", "private"),
        ))
        .withEntity(HttpEntity.empty(contentType))
    }

    Source
      .single(req)
      .mapAsync(1)(reqparts=>doRequestSigning(reqparts._1, region, credentialsProvider).map(rq=>(rq, ())))
      .via(poolClientFlow)
      .runWith(Sink.head)
      .flatMap({
        case (Success(response), _)=>
          (response.status: @switch) match {
            case StatusCodes.OK =>
              loadResponseBody(response)
                .map(scala.xml.XML.loadString)
                .map(elems => (elems \\ "UploadId").text)
                .map(uploadId => {
                  logger.info(s"Successfully initiated a multipart upload with ID $uploadId")
                  uploadId
                })
            case _=>
              loadResponseBody(response)
                .flatMap(errContent=> {
                  logger.error(s"Could not initiate multipart upload for s3://$destBucket/$destKey: ${response.status}")
                  logger.error(s"s3://$destBucket/$destKey: $errContent")
                  Future.failed(new RuntimeException(s"Server error ${response.status}"))
                })
          }
        case (Failure(err), _) =>
          logger.error(s"Could not initate multipart upload for s3://$destBucket/$destKey: ${err.getMessage}", err)
          Future.failed(err)
      })
  }