membership-attribute-service/app/services/DynamoSupporterProductDataService.scala (71 lines of code) (raw):

package services import com.gu.i18n.Currency import com.gu.monitoring.SafeLogger.LogPrefix import com.gu.monitoring.SafeLogging import models.{Attributes, DynamoSupporterRatePlanItem} import monitoring.CreateMetrics import org.joda.time.{DateTimeZone, LocalDate} import org.scanamo.DynamoReadError.describe import org.scanamo._ import org.scanamo.generic.semiauto._ import org.scanamo.syntax._ import scalaz.std.scalaFuture._ import scalaz.{EitherT, \/} import services.DynamoSupporterProductDataService.errorMessage import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import utils.SimpleEitherT.SimpleEitherT import scala.concurrent.{ExecutionContext, Future} class SupporterProductDataService( client: DynamoDbAsyncClient, table: String, mapper: SupporterRatePlanToAttributesMapper, createMetrics: CreateMetrics, )(implicit executionContext: ExecutionContext, ) extends SafeLogging { val metrics = createMetrics.forService(classOf[SupporterProductDataService]) // class name referenced in CloudFormation(!) implicit val jodaStringFormat: DynamoFormat[LocalDate] = DynamoFormat.coercedXmap[LocalDate, String, IllegalArgumentException](LocalDate.parse, _.toString) implicit val currencyFormat: DynamoFormat[Currency] = DynamoFormat.xmap[Currency, String](s => Currency.fromString(s).toRight(TypeCoercionError(new Throwable("Invalid currency"))), _.iso) implicit val dynamoSupporterRatePlanItem: DynamoFormat[DynamoSupporterRatePlanItem] = deriveDynamoFormat def getNonCancelledAttributes(identityId: String)(implicit logPrefix: LogPrefix): Future[Either[String, Option[Attributes]]] = { getSupporterRatePlanItems(identityId).map { ratePlanItems => val nonCancelled = ratePlanItems.filter(item => !item.cancellationDate.exists(_.isBefore(LocalDate.now(DateTimeZone.UTC)))) mapper.attributesFromSupporterRatePlans(identityId, nonCancelled) }.toEither } def getSupporterRatePlanItems(identityId: String)(implicit logPrefix: LogPrefix): SimpleEitherT[List[DynamoSupporterRatePlanItem]] = { EitherT( for { futureDynamoResult <- getSupporterRatePlanItemsWithReadErrors(identityId) futureErrors = futureDynamoResult.collect { case Left(error) => error } _ = alertOnDynamoReadErrors(identityId, futureErrors) futureRatePlanItems = futureDynamoResult.collect({ case Right(ratePlanItem) => ratePlanItem }) } yield if (futureErrors.isEmpty || futureRatePlanItems.nonEmpty) \/.right(futureRatePlanItems) else \/.left(errorMessage(identityId, futureErrors)), ) } private def getSupporterRatePlanItemsWithReadErrors(identityId: String): Future[List[Either[DynamoReadError, DynamoSupporterRatePlanItem]]] = ScanamoAsync(client).exec { Table[DynamoSupporterRatePlanItem](table) .query("identityId" === identityId) } private def alertOnDynamoReadErrors(identityId: String, errors: List[DynamoReadError])(implicit logPrefix: LogPrefix) = if (errors.nonEmpty) { logger.error(scrub"${errorMessage(identityId, errors)}") metrics.incrementCount("SupporterProductDataDynamoError") // referenced in CloudFormation } } object DynamoSupporterProductDataService { def errorMessage(identityId: String, errors: List[DynamoReadError]) = s"There were read errors while reading from the SupporterProductData DynamoDB table " + s"for user $identityId\n ${errors.map(describe).mkString("\n")}" }