def fileString()

in magenta-lib/src/main/scala/magenta/tasks/tasks.scala [77:176]


  def fileString(quantity: Int) =
    s"$quantity file${if (quantity != 1) "s" else ""}"

  // end-user friendly description of this task
  def description: String =
    s"Upload ${fileString(objectMappings.size)} to S3 bucket $bucket using file mapping $paths"

  def requestToString(source: S3Object, request: PutReq): String =
    s"s3://${source.bucket}/${source.key} to s3://${request.target.bucket}/${request.target.key} with " +
      s"CacheControl:${request.cacheControl} ContentType:${request.contentType} PublicRead:${request.publicReadAcl}"

  // execute this task (should throw on failure)
  override def execute(
      resources: DeploymentResources,
      stopFlag: => Boolean
  ): Unit = {
    if (totalSize == 0) {
      val locationDescription = paths
        .map {
          case (path: S3Path, _) => path.show()
          case (location, _)     => location.toString
        }
        .mkString("\n")
      resources.reporter.fail(
        s"No files found to upload in $locationDescription"
      )
    }

    val withClient = withClientFactory(
      keyRing,
      region,
      AWS.clientConfigurationNoRetry,
      resources
    )
    withClient { client =>
      resources.reporter.verbose(
        s"Starting transfer of ${fileString(objectMappings.size)} ($totalSize bytes)"
      )
      requests.zipWithIndex.par.foreach { case (req, index) =>
        logger.debug(s"Transferring ${requestToString(req.source, req)}")
        index match {
          case x if x < 10 =>
            resources.reporter.verbose(
              s"Transferring ${requestToString(req.source, req)}"
            )
          case 10 =>
            resources.reporter.verbose(
              s"Not logging details for the remaining ${fileString(objectMappings.size - 10)}"
            )
          case _ =>
        }
        retryOnException(AWS.clientConfiguration) {
          val getObjectRequest = GetObjectRequest
            .builder()
            .bucket(req.source.bucket)
            .key(req.source.key)
            .build()

          val os = new PipedOutputStream()
          val is = new PipedInputStream(os, 1024 * 1024)

          val transformer
              : ResponseTransformer[GetObjectResponse, GetObjectResponse] =
            ResponseTransformer.toOutputStream(os)
          val response: Future[GetObjectResponse] = Future {
            try {
              resources.artifactClient.getObject(getObjectRequest, transformer)
            } finally {
              os.close()
            }
          }(resources.ioExecutionContext)

          val putRequest: PutObjectRequest = req.toAwsRequest
          val result =
            try {
              client.putObject(
                putRequest,
                AWSRequestBody.fromContentProvider(
                  () => is,
                  req.source.size,
                  req.mimeType
                )
              )
            } finally {
              is.close()
            }

          import scala.concurrent.duration._
          Await.result(response, 5 minutes)

          logger.debug(
            s"Put object ${putRequest.key}: MD5: ${result.sseCustomerKeyMD5} Metadata: ${result.responseMetadata}"
          )
        }
      }
    }
    resources.reporter.verbose(
      s"Finished transfer of ${fileString(objectMappings.size)}"
    )
  }