in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [52:77]
def ids: Seq[Long] = items.map(itemToSeq).sorted
private def itemToSeq(i: Item): Long = map(i.get(Key)) * PartitionSize + i.get(Sort).getN.toInt
}
}
/**
* A simple data structure representing a Partition Key sequence number and the event numbers contained within it.
*
* @param partitionSeqNum - the partition sequence number for the given persistence id.
* @param partitionEventNums - will be 0-99, representing the event ordering within the given partition sequence.
*/
case class PartitionKeys(partitionSeqNum: Long, partitionEventNums: immutable.Seq[Long])
/**
* Groups Longs from a stream into a [PartitionKeys] whereas each sequence shall contain the values that would be within the
* given partition size (represented by n)
*/
object DynamoPartitionGrouped extends GraphStage[FlowShape[Long, PartitionKeys]] {
val in = Inlet[Long]("DynamoEventNum.in")
val out = Outlet[PartitionKeys]("DynamoPartitionKeys.out")
override val initialAttributes = Attributes.name("DynamoPartitionGrouped")
override val shape: FlowShape[Long, PartitionKeys] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {