in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [134:194]
override def createLogic(attr: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
var batchEnd = NoBatch
var batch = List.empty[Item]
setHandler(out, this)
setHandler(in, this)
override def onPull(): Unit = pull(in)
override def onPush(): Unit = {
val item = grab(in)
if (item.containsKey(AtomEnd)) {
val end = item.get(AtomEnd).getN.toLong
val index = item.get(AtomIndex).getN.toLong
val seqNr = sn(item)
val myBatchEnd = seqNr - index + end
if (seqNr == batchEnd) {
val result =
if (myBatchEnd == batchEnd) {
val r = (item :: batch).reverse
batch = Nil
batchEnd = NoBatch
r
} else {
// foul play detected, scrap this batch
batch = item :: Nil
batchEnd = myBatchEnd
Nil
}
if (result.size == (end + 1)) push(out, result)
else pull(in)
} else if (batchEnd == NoBatch || seqNr > batchEnd) {
batchEnd = myBatchEnd
batch = item :: Nil
pull(in)
} else {
if (batchEnd == myBatchEnd) batch ::= item
else {
batchEnd = myBatchEnd
batch = item :: Nil
}
pull(in)
}
} else {
push(out, item :: Nil)
// throw away possible incomplete batch
batchEnd = NoBatch
batch = Nil
}
}
private def sn(item: Item): Long = {
val s = item.get(Key).getS
val n = item.get(Sort).getN.toLong
val pos = s.lastIndexOf('-')
require(pos != -1, "unknown key format " + s)
s.substring(pos + 1).toLong * PartitionSize + n
}
}