in src/main/scala/org/apache/pekko/persistence/dynamodb/query/scaladsl/internal/DynamoDBCurrentPersistenceIdsQuery.scala [127:150]
private def queryPersistenceIds(fromPersistenceId: Option[String])(
exclusiveStartKey: Option[java.util.Map[String, AttributeValue]]) = {
def queryRequest(exclusiveStartKey: Option[java.util.Map[String, AttributeValue]]): QueryRequest = {
val req = new QueryRequest()
.withTableName(readJournalSettings.Table)
.withIndexName(readJournalSettings.PersistenceIdsIndexName)
.withProjectionExpression("par")
fromPersistenceId match {
case Some(persistenceId) =>
req
.withKeyConditionExpression("num = :n AND par > :p")
.withExpressionAttributeValues(
Map(":n" -> 1.toAttribute, ":p" -> messagePartitionKeyFromGroupNr(persistenceId, 0).toAttribute).asJava)
case None =>
req.withKeyConditionExpression("num = :n").withExpressionAttributeValues(Map(":n" -> 1.toAttribute).asJava)
}
exclusiveStartKey.foreach(esk => req.withExclusiveStartKey(esk))
req
}
dynamo.query(queryRequest(exclusiveStartKey))
}