in lambda/src/main/scala/pricemigrationengine/services/CohortTableLive.scala [113:187]
def impl(
cohortSpec: CohortSpec
): ZLayer[DynamoDBZIO with StageConfig with CohortTableConfig with Logging, ConfigFailure, CohortTable] = {
ZLayer.fromZIO {
for {
dynamoDbZio <- ZIO.service[DynamoDBZIO]
stageConfig <- ZIO.service[StageConfig]
tableName = cohortSpec.tableName(stageConfig.stage)
cohortTableConfig <- ZIO.service[CohortTableConfig]
logging <- ZIO.service[Logging]
} yield new CohortTable {
override def fetch(
filter: CohortTableFilter,
latestStartDateInclusive: Option[LocalDate]
): ZStream[Any, CohortFetchFailure, CohortItem] = {
val indexName =
latestStartDateInclusive
.fold(ProcessingStageIndexName)(_ => ProcessingStageAndStartDateIndexName)
val queryRequest =
QueryRequest.builder
.tableName(tableName)
.indexName(indexName)
.keyConditionExpression(
"processingStage = :processingStage" + latestStartDateInclusive.fold("") { _ =>
" AND startDate <= :latestStartDateInclusive"
}
)
.expressionAttributeValues(
List(
Some(":processingStage" -> AttributeValue.builder.s(filter.value).build()),
latestStartDateInclusive.map { latestStartDateInclusive =>
":latestStartDateInclusive" -> AttributeValue.builder
.s(latestStartDateInclusive.toString)
.build()
}
).flatten.toMap.asJava
)
.limit(cohortTableConfig.batchSize)
.build()
dynamoDbZio.query(queryRequest).mapError(error => CohortFetchFailure(error.toString))
}
override def create(cohortItem: CohortItem): IO[Failure, Unit] = {
dynamoDbZio
.create(table = tableName, keyName = keyAttribName, value = cohortItem)
.mapError {
case DynamoDBZIOError(reason, _: Some[_]) =>
CohortItemAlreadyPresentFailure(reason)
case error => CohortCreateFailure(error.toString)
}
}
override def update(cohortItem: CohortItem): ZIO[Any, CohortUpdateFailure, Unit] = {
dynamoDbZio
.update(table = tableName, key = CohortTableKey(cohortItem.subscriptionName), value = cohortItem)
.mapError(error => CohortUpdateFailure(error.toString))
.tapBoth(
e => logging.error(s"Failed to update Cohort table: $e"),
_ => logging.info(s"Wrote ${cohortItem} to Cohort table")
)
}
override def fetchAll(): ZStream[Any, CohortFetchFailure, CohortItem] = {
val queryRequest = ScanRequest.builder
.tableName(tableName)
.limit(cohortTableConfig.batchSize)
.build()
for {
queryResults <- dynamoDbZio.scan(queryRequest).mapError(error => CohortFetchFailure(error.toString))
} yield queryResults
}.mapError(error => CohortFetchFailure(error.toString))
}
}
}