in lambda/src/main/scala/pricemigrationengine/services/DynamoDBZIOLive.scala [18:41]
override def query[A](
query: QueryRequest
)(implicit deserializer: DynamoDBDeserialiser[A]): ZStream[Any, DynamoDBZIOError, A] =
recursivelyExecuteQueryUntilAllResultsAreStreamed(query)
.mapZIO(deserializer.deserialise)
private def recursivelyExecuteQueryUntilAllResultsAreStreamed(
query: QueryRequest
): ZStream[Any, DynamoDBZIOError, util.Map[String, AttributeValue]] = {
ZStream
.unfoldZIO(Some(query).asInstanceOf[Option[QueryRequest]]) {
case Some(queryRequest) =>
for {
queryResult <- sendQueryRequest(queryRequest)
_ <- logging.info(s"Received query results for batch with ${queryResult.items.asScala.length} items")
queryForNextBatch = Option(queryResult.lastEvaluatedKey)
.filterNot(_.isEmpty)
.map(lastEvaluatedKey => queryRequest.copy(x => x.exclusiveStartKey(lastEvaluatedKey)))
} yield Some((queryResult.items.asScala, queryForNextBatch))
case None =>
ZIO.none
}
.flatMap(resultList => ZStream.fromIterable(resultList))
}