app/services/S3.scala (149 lines of code) (raw):

package services import com.typesafe.scalalogging.StrictLogging import io.circe.{Decoder, Encoder} import io.circe.parser.decode import io.circe.syntax._ import services.S3Client._ import software.amazon.awssdk.core.sync.RequestBody import software.amazon.awssdk.services.s3.model.{GetObjectRequest, HeadObjectRequest, ListObjectsRequest, ObjectCannedACL, PutObjectRequest} import zio._ import zio.blocking.effectBlocking import software.amazon.awssdk.services.s3.{S3Client => AwsS3Client} import utils.Circe.noNulls import scala.jdk.CollectionConverters._ case class VersionedS3Data[T](value: T, version: String) trait S3Client { def get: S3Action[RawVersionedS3Data] def update(data: RawVersionedS3Data): S3Action[Unit] def createOrUpdate(data: String): S3Action[Unit] def listKeys: S3Action[List[String]] } object S3Client { type RawVersionedS3Data = VersionedS3Data[String] type S3Action[T] = S3ObjectSettings => ZIO[ZEnv, S3ClientError,T] case class S3ObjectSettings( bucket: String, key: String, publicRead: Boolean, cacheControl: Option[String] = None, surrogateControl: Option[String] = None ) sealed trait S3ClientError extends Throwable case class S3GetObjectError(error: Throwable) extends S3ClientError { override def getMessage = s"Error reading from S3: ${error.getMessage}" } case class S3PutObjectError(error: Throwable) extends S3ClientError { override def getMessage = s"Error writing to S3: ${error.getMessage}" } case object S3VersionMatchError extends S3ClientError { override def getMessage = "Can't save your settings because someone else has updated them since they were last fetched" } case class S3ListObjectsError(error: Throwable) extends S3ClientError { override def getMessage = s"Error getting object list from S3: ${error.getMessage}" } } object S3 extends S3Client with StrictLogging { val s3Client: AwsS3Client = AwsS3Client .builder .region(Aws.region) .credentialsProvider(Aws.credentialsProvider.build) .build def get: S3Action[RawVersionedS3Data] = { objectSettings => val request = GetObjectRequest .builder .bucket(objectSettings.bucket) .key(objectSettings.key) .build ZManaged .fromAutoCloseable(effectBlocking(s3Client.getObject(request))) .use { s3Object => Task { VersionedS3Data( value = scala.io.Source.fromInputStream(s3Object).mkString, version = s3Object.response().versionId() ) } } .mapError { e => logger.error(s"Error reading $objectSettings from S3: ${e.getMessage}", e) S3GetObjectError(e) } } def update(data: RawVersionedS3Data): S3Action[Unit] = { objectSettings => val request = HeadObjectRequest .builder .bucket(objectSettings.bucket) .key(objectSettings.key) .build effectBlocking(s3Client.headObject(request)) .mapError(e => { logger.error(s"Error getting object metadata for $objectSettings: ${e.getMessage}", e) S3GetObjectError(e) }) .flatMap(response => { if (response.versionId() == data.version) { createOrUpdate(data.value)(objectSettings) } else { logger.warn(s"Cannot update S3 object $objectSettings because provided version (${data.version}) does not match latest version (${response.versionId()})") IO.fail(S3VersionMatchError) } }) } def createOrUpdate(data: String): S3Action[Unit] = { objectSettings => UIO.effectTotal { val request = PutObjectRequest.builder .bucket(objectSettings.bucket) .key(objectSettings.key) val requestModifiers: List[Option[PutObjectRequest.Builder => PutObjectRequest.Builder]] = List( objectSettings.cacheControl.map(cc => _.cacheControl(cc)), objectSettings.surrogateControl.map(sc => _.metadata(Map("surrogate-control" -> sc).asJava)), if (objectSettings.publicRead) Some(_.acl(ObjectCannedACL.PUBLIC_READ)) else None ) requestModifiers .flatten .foldLeft(request)((req, modifier) => modifier(req)) .build() }.flatMap { request => effectBlocking { s3Client.putObject(request, RequestBody.fromString(data)) }.unit } .mapError { e => logger.error(s"Error writing $objectSettings to S3: ${e.getMessage}", e) S3PutObjectError(e) } } def listKeys: S3Action[List[String]] = { objectSettings => effectBlocking { val request = ListObjectsRequest .builder .bucket(objectSettings.bucket) .prefix(objectSettings.key) .build val result = s3Client.listObjects(request) result.contents().asScala.toList.map(_.key) }.mapError { e => logger.info(s"Error listing S3 objects for $objectSettings: ${e.getMessage}") S3ListObjectsError(e) } } } object S3Json extends StrictLogging { case class S3JsonError(objectSettings: S3ObjectSettings, error: io.circe.Error) extends Throwable { override def getMessage = s"Error decoding json from S3 ($objectSettings): ${error.getMessage}" } def getFromJson[T : Decoder](s3: S3Client): S3ObjectSettings => ZIO[ZEnv,Throwable,VersionedS3Data[T]] = { objectSettings => s3.get(objectSettings).flatMap { raw => IO.fromEither(decode[T](raw.value)) .map(decoded => raw.copy(value = decoded)) .mapError { error => logger.error(s"Error decoding json from S3 ($objectSettings): ${error.getMessage}", error) S3JsonError(objectSettings, error) } } } def updateAsJson[T: Encoder](data: VersionedS3Data[T])(s3: S3Client): S3Action[Unit] = s3.update(data.copy(value = noNulls(data.value.asJson))) def createOrUpdateAsJson[T: Encoder](data: T)(s3: S3Client): S3Action[Unit] = s3.createOrUpdate(noNulls(data.asJson)) }