def getPartitionItems()

in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [250:284]


  def getPartitionItems(persistenceId: String, partitionKeys: PartitionKeys): Future[ReplayBatch] = {
    val sortedNrs = partitionKeys.partitionEventNums.sorted.map(_ % PartitionSize)
    val startSortKey = sortedNrs.head
    val endSortKey = sortedNrs.last

    val queryRequestBuilder: (Option[java.util.Map[String, AttributeValue]]) => QueryRequest = exclusiveStartKeyOpt => {
      val request = new QueryRequest()
        .withTableName(JournalTable)
        .withKeyConditionExpression(s"$Key = :kkey AND $Sort BETWEEN :startSKey AND :endSKey")
        .withExpressionAttributeValues(
          Map(
            ":kkey" -> S(messagePartitionKeyFromGroupNr(persistenceId, partitionKeys.partitionSeqNum)),
            ":startSKey" -> N(startSortKey),
            ":endSKey" -> N(endSortKey)).asJava)
        .withProjectionExpression(ItemAttributesForReplay.mkString(","))
        .withConsistentRead(true)
        .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
      exclusiveStartKeyOpt.foreach(startKey => request.withExclusiveStartKey(startKey))
      request
    }

    def dynamoSummingPager(queryReq: QueryRequest, acc: Seq[Item]): Future[Seq[Item]] = {
      dynamo.query(queryReq).flatMap { result =>
        val currentPageItems = result.getItems.asScala.toSeq
        if (result.getLastEvaluatedKey == null || result.getLastEvaluatedKey.isEmpty)
          Future.successful(acc ++ currentPageItems)
        else
          dynamoSummingPager(queryRequestBuilder(Some(result.getLastEvaluatedKey)), acc ++ currentPageItems)
      }
    }

    val batchKeys = partitionKeys.partitionEventNums.map(s => messageKey(persistenceId, s) -> (s / PartitionSize))
    val batchKeysMap = batchKeys.iterator.map(p => p._1.get(Key) -> p._2).toMap
    dynamoSummingPager(queryRequestBuilder(None), Seq.empty).map(result => ReplayBatch(result, batchKeysMap))
  }