in magenta-lib/src/main/scala/magenta/tasks/gcp/GCSTasks.scala [72:161]
def fileString(quantity: Int) =
s"$quantity file${if (quantity != 1) "s" else ""}"
// end-user friendly description of this task
def description: String =
s"Upload ${fileString(objectMappings.size)} to GCS bucket $gcsTargetBucket using file mapping $paths"
// execute this task (should throw on failure)
override def execute(
resources: DeploymentResources,
stopFlag: => Boolean
): Unit = {
if (totalSize == 0) {
val locationDescription = paths
.map {
case (path: S3Path, _) => path.show()
case (location, _) => location.toString
}
.mkString("\n")
resources.reporter.fail(
s"No files found to upload in $locationDescription"
)
}
val withClient = withClientFactory(keyRing, resources)
withClient { client =>
val currentlyDeployedObjectsToDelete =
getCurrentObjectsForDeletion(client)
resources.reporter.verbose(
s"Starting transfer of ${fileString(objectMappings.size)} ($totalSize bytes)"
)
transfers.zipWithIndex.par.foreach { case (transfer, index) =>
logger.debug(s"Transferring $transfer")
index match {
case x if x < 10 =>
resources.reporter.verbose(s"Transferring $transfer")
case 10 =>
resources.reporter.verbose(
s"Not logging details for the remaining ${fileString(objectMappings.size - 10)}"
)
case _ =>
}
GCP.api.retryWhen500orGoogleError(
resources.reporter,
s"GCS Upload $transfer"
) {
val copyObjectRequest = GetObjectRequest
.builder()
.bucket(transfer.source.bucket)
.key(transfer.source.key)
.build()
val inputStream = resources.artifactClient
.getObjectAsBytes(copyObjectRequest)
.asInputStream()
val contentType = Option(transfer.target.getContentType)
.getOrElse(URLConnection.guessContentTypeFromStream(inputStream))
val result = client
.objects()
.insert(
gcsTargetBucket.name,
transfer.target,
new InputStreamContent(contentType, inputStream)
)
.execute()
logger.debug(
s"Put object ${result.getName}: MD5: ${result.getMd5Hash} Metadata: ${result.getMetadata}"
)
result
}
}
currentlyDeployedObjectsToDelete.par.foreach {
case storageObjectToDelete =>
resources.reporter.verbose(
s"Deleting obsolete file from GCP: gcs://${gcsTargetBucket.name}/${storageObjectToDelete.getName}"
)
val errorMessage =
s"Could notremove obselete object ${storageObjectToDelete.getName}"
GCP.api.retryWhen500orGoogleError(resources.reporter, errorMessage) {
client
.objects()
.delete(gcsTargetBucket.name, storageObjectToDelete.getName)
.execute
}
}
}
resources.reporter.verbose(
s"Finished transfer of ${fileString(objectMappings.size)}"
)
}