in lambda/src/main/scala/pricemigrationengine/services/DynamoDBZIOLive.scala [62:79]
private def recursivelyExecuteScanUntilAllResultsAreStreamed(
query: ScanRequest
): ZStream[Any, DynamoDBZIOError, util.Map[String, AttributeValue]] = {
ZStream
.unfoldZIO(Some(query).asInstanceOf[Option[ScanRequest]]) {
case Some(queryRequest) =>
for {
scanResult <- sendScanRequest(queryRequest)
_ <- logging.info(s"Received query results for batch with ${scanResult.count} items")
queryForNextBatch = Option(scanResult.lastEvaluatedKey)
.filterNot(_.isEmpty)
.map(lastEvaluatedKey => queryRequest.copy(x => x.exclusiveStartKey(lastEvaluatedKey)))
} yield Some((scanResult.items.asScala, queryForNextBatch))
case None =>
ZIO.none
}
.flatMap(resultList => ZStream.fromIterable(resultList))
}