private def recursivelyExecuteScanUntilAllResultsAreStreamed()

in lambda/src/main/scala/pricemigrationengine/services/DynamoDBZIOLive.scala [62:79]


        private def recursivelyExecuteScanUntilAllResultsAreStreamed(
            query: ScanRequest
        ): ZStream[Any, DynamoDBZIOError, util.Map[String, AttributeValue]] = {
          ZStream
            .unfoldZIO(Some(query).asInstanceOf[Option[ScanRequest]]) {
              case Some(queryRequest) =>
                for {
                  scanResult <- sendScanRequest(queryRequest)
                  _ <- logging.info(s"Received query results for batch with ${scanResult.count} items")
                  queryForNextBatch = Option(scanResult.lastEvaluatedKey)
                    .filterNot(_.isEmpty)
                    .map(lastEvaluatedKey => queryRequest.copy(x => x.exclusiveStartKey(lastEvaluatedKey)))
                } yield Some((scanResult.items.asScala, queryForNextBatch))
              case None =>
                ZIO.none
            }
            .flatMap(resultList => ZStream.fromIterable(resultList))
        }