in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [250:284]
def getPartitionItems(persistenceId: String, partitionKeys: PartitionKeys): Future[ReplayBatch] = {
val sortedNrs = partitionKeys.partitionEventNums.sorted.map(_ % PartitionSize)
val startSortKey = sortedNrs.head
val endSortKey = sortedNrs.last
val queryRequestBuilder: (Option[java.util.Map[String, AttributeValue]]) => QueryRequest = exclusiveStartKeyOpt => {
val request = new QueryRequest()
.withTableName(JournalTable)
.withKeyConditionExpression(s"$Key = :kkey AND $Sort BETWEEN :startSKey AND :endSKey")
.withExpressionAttributeValues(
Map(
":kkey" -> S(messagePartitionKeyFromGroupNr(persistenceId, partitionKeys.partitionSeqNum)),
":startSKey" -> N(startSortKey),
":endSKey" -> N(endSortKey)).asJava)
.withProjectionExpression(ItemAttributesForReplay.mkString(","))
.withConsistentRead(true)
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
exclusiveStartKeyOpt.foreach(startKey => request.withExclusiveStartKey(startKey))
request
}
def dynamoSummingPager(queryReq: QueryRequest, acc: Seq[Item]): Future[Seq[Item]] = {
dynamo.query(queryReq).flatMap { result =>
val currentPageItems = result.getItems.asScala.toSeq
if (result.getLastEvaluatedKey == null || result.getLastEvaluatedKey.isEmpty)
Future.successful(acc ++ currentPageItems)
else
dynamoSummingPager(queryRequestBuilder(Some(result.getLastEvaluatedKey)), acc ++ currentPageItems)
}
}
val batchKeys = partitionKeys.partitionEventNums.map(s => messageKey(persistenceId, s) -> (s / PartitionSize))
val batchKeysMap = batchKeys.iterator.map(p => p._1.get(Key) -> p._2).toMap
dynamoSummingPager(queryRequestBuilder(None), Seq.empty).map(result => ReplayBatch(result, batchKeysMap))
}