in magenta-lib/src/main/scala/magenta/tasks/tasks.scala [77:176]
def fileString(quantity: Int) =
s"$quantity file${if (quantity != 1) "s" else ""}"
// end-user friendly description of this task
def description: String =
s"Upload ${fileString(objectMappings.size)} to S3 bucket $bucket using file mapping $paths"
def requestToString(source: S3Object, request: PutReq): String =
s"s3://${source.bucket}/${source.key} to s3://${request.target.bucket}/${request.target.key} with " +
s"CacheControl:${request.cacheControl} ContentType:${request.contentType} PublicRead:${request.publicReadAcl}"
// execute this task (should throw on failure)
override def execute(
resources: DeploymentResources,
stopFlag: => Boolean
): Unit = {
if (totalSize == 0) {
val locationDescription = paths
.map {
case (path: S3Path, _) => path.show()
case (location, _) => location.toString
}
.mkString("\n")
resources.reporter.fail(
s"No files found to upload in $locationDescription"
)
}
val withClient = withClientFactory(
keyRing,
region,
AWS.clientConfigurationNoRetry,
resources
)
withClient { client =>
resources.reporter.verbose(
s"Starting transfer of ${fileString(objectMappings.size)} ($totalSize bytes)"
)
requests.zipWithIndex.par.foreach { case (req, index) =>
logger.debug(s"Transferring ${requestToString(req.source, req)}")
index match {
case x if x < 10 =>
resources.reporter.verbose(
s"Transferring ${requestToString(req.source, req)}"
)
case 10 =>
resources.reporter.verbose(
s"Not logging details for the remaining ${fileString(objectMappings.size - 10)}"
)
case _ =>
}
retryOnException(AWS.clientConfiguration) {
val getObjectRequest = GetObjectRequest
.builder()
.bucket(req.source.bucket)
.key(req.source.key)
.build()
val os = new PipedOutputStream()
val is = new PipedInputStream(os, 1024 * 1024)
val transformer
: ResponseTransformer[GetObjectResponse, GetObjectResponse] =
ResponseTransformer.toOutputStream(os)
val response: Future[GetObjectResponse] = Future {
try {
resources.artifactClient.getObject(getObjectRequest, transformer)
} finally {
os.close()
}
}(resources.ioExecutionContext)
val putRequest: PutObjectRequest = req.toAwsRequest
val result =
try {
client.putObject(
putRequest,
AWSRequestBody.fromContentProvider(
() => is,
req.source.size,
req.mimeType
)
)
} finally {
is.close()
}
import scala.concurrent.duration._
Await.result(response, 5 minutes)
logger.debug(
s"Put object ${putRequest.key}: MD5: ${result.sseCustomerKeyMD5} Metadata: ${result.responseMetadata}"
)
}
}
}
resources.reporter.verbose(
s"Finished transfer of ${fileString(objectMappings.size)}"
)
}