def impl()

in lambda/src/main/scala/pricemigrationengine/services/CohortTableLive.scala [113:187]


  def impl(
      cohortSpec: CohortSpec
  ): ZLayer[DynamoDBZIO with StageConfig with CohortTableConfig with Logging, ConfigFailure, CohortTable] = {
    ZLayer.fromZIO {
      for {
        dynamoDbZio <- ZIO.service[DynamoDBZIO]
        stageConfig <- ZIO.service[StageConfig]
        tableName = cohortSpec.tableName(stageConfig.stage)
        cohortTableConfig <- ZIO.service[CohortTableConfig]
        logging <- ZIO.service[Logging]
      } yield new CohortTable {

        override def fetch(
            filter: CohortTableFilter,
            latestStartDateInclusive: Option[LocalDate]
        ): ZStream[Any, CohortFetchFailure, CohortItem] = {
          val indexName =
            latestStartDateInclusive
              .fold(ProcessingStageIndexName)(_ => ProcessingStageAndStartDateIndexName)
          val queryRequest =
            QueryRequest.builder
              .tableName(tableName)
              .indexName(indexName)
              .keyConditionExpression(
                "processingStage = :processingStage" + latestStartDateInclusive.fold("") { _ =>
                  " AND startDate <= :latestStartDateInclusive"
                }
              )
              .expressionAttributeValues(
                List(
                  Some(":processingStage" -> AttributeValue.builder.s(filter.value).build()),
                  latestStartDateInclusive.map { latestStartDateInclusive =>
                    ":latestStartDateInclusive" -> AttributeValue.builder
                      .s(latestStartDateInclusive.toString)
                      .build()
                  }
                ).flatten.toMap.asJava
              )
              .limit(cohortTableConfig.batchSize)
              .build()
          dynamoDbZio.query(queryRequest).mapError(error => CohortFetchFailure(error.toString))
        }

        override def create(cohortItem: CohortItem): IO[Failure, Unit] = {
          dynamoDbZio
            .create(table = tableName, keyName = keyAttribName, value = cohortItem)
            .mapError {
              case DynamoDBZIOError(reason, _: Some[_]) =>
                CohortItemAlreadyPresentFailure(reason)
              case error => CohortCreateFailure(error.toString)
            }
        }

        override def update(cohortItem: CohortItem): ZIO[Any, CohortUpdateFailure, Unit] = {
          dynamoDbZio
            .update(table = tableName, key = CohortTableKey(cohortItem.subscriptionName), value = cohortItem)
            .mapError(error => CohortUpdateFailure(error.toString))
            .tapBoth(
              e => logging.error(s"Failed to update Cohort table: $e"),
              _ => logging.info(s"Wrote ${cohortItem} to Cohort table")
            )
        }

        override def fetchAll(): ZStream[Any, CohortFetchFailure, CohortItem] = {
          val queryRequest = ScanRequest.builder
            .tableName(tableName)
            .limit(cohortTableConfig.batchSize)
            .build()
          for {
            queryResults <- dynamoDbZio.scan(queryRequest).mapError(error => CohortFetchFailure(error.toString))
          } yield queryResults
        }.mapError(error => CohortFetchFailure(error.toString))
      }
    }
  }