backend/app/utils/aws/S3Client.scala (69 lines of code) (raw):

package utils.aws import java.io.{ByteArrayInputStream, InputStream} import java.nio.file.Path import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model._ import com.amazonaws.services.s3.transfer.internal.S3ProgressListener import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.{PersistableTransfer, TransferManager} import com.amazonaws.{AmazonServiceException, event} import org.apache.commons.io.IOUtils import services.S3Config import utils.attempt.Attempt import utils.{AwsCredentials, AwsS3Clients} import scala.concurrent.ExecutionContext import scala.language.implicitConversions class S3Client(config: S3Config)(implicit executionContext: ExecutionContext) { val credentials = AwsCredentials(accessKey = config.accessKey, secretKey = config.secretKey) val (aws: AmazonS3, tm: TransferManager) = AwsS3Clients(credentials, config.region, config.endpoint) def attemptS3[T](f: => T): Attempt[T] = Attempt.catchNonFatal(f)(AwsErrors.exceptionToFailure) // Minio only works with the deprecated method on the client. This is a copy paste into our code to avoid the warnings def doesBucketExist(bucket: String): Boolean = try { aws.headBucket(new HeadBucketRequest(bucket)) true } catch { case ase: AmazonServiceException if ase.getStatusCode == 301 || ase.getStatusCode == 403 => true case ase: AmazonServiceException if ase.getStatusCode == 404 => false } // TODO MRB: these should all be attempty def putObjectSync(bucket: String, key: String, contentType: Option[String], contentLength: Long, is: InputStream): PutObjectResult = { val metadata = createMetadata(contentType, Some(contentLength)) val request = new PutObjectRequest(bucket, key, is, metadata) try { aws.putObject(request) } finally { is.close() } } def putObjectSync(bucket: String, key: String, contentType: Option[String], file: Path): PutObjectResult = { val metadata = createMetadata(contentType) val request = new PutObjectRequest(bucket, key, file.toFile).withMetadata(metadata) aws.putObject(request) } def putLargeObject(bucket: String, key: String, contentType: Option[String], file: Path, updateCallback: event.ProgressEvent => Unit = _ => ()): UploadResult = { val metadata = createMetadata(contentType) val request = new PutObjectRequest(bucket, key, file.toFile).withMetadata(metadata) val progressListener = new S3ProgressListener{ override def onPersistableTransfer(persistableTransfer: PersistableTransfer): Unit = {} override def progressChanged(progressEvent: event.ProgressEvent): Unit = updateCallback(progressEvent) } tm.upload(request, progressListener).waitForUploadResult() } def putObjectSync(bucket: String, key: String, contentType: Option[String], data: Array[Byte]): PutObjectResult = { val metadata = createMetadata(contentType, Some(data.length)) val request = new PutObjectRequest(bucket, key, new ByteArrayInputStream(data), metadata) aws.putObject(request) } def putObject(bucket: String, key: String, contentType: Option[String], data: Array[Byte]): Attempt[PutObjectResult] = attemptS3 { putObjectSync(bucket, key, contentType, data) } def listObjects(bucket: String, prefix: String): Attempt[ObjectListing] = attemptS3(aws.listObjects(bucket, prefix)) private def createMetadata(contentType: Option[String], contentLength: Option[Long] = None): ObjectMetadata = { val metadata = new ObjectMetadata() contentType.foreach(metadata.setContentType) contentLength.foreach(metadata.setContentLength) config.sseAlgorithm.foreach(metadata.setSSEAlgorithm) metadata } }