override def createLogic()

in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [134:194]


  override def createLogic(attr: Attributes) =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      var batchEnd = NoBatch
      var batch = List.empty[Item]

      setHandler(out, this)
      setHandler(in, this)

      override def onPull(): Unit = pull(in)

      override def onPush(): Unit = {
        val item = grab(in)
        if (item.containsKey(AtomEnd)) {
          val end = item.get(AtomEnd).getN.toLong
          val index = item.get(AtomIndex).getN.toLong
          val seqNr = sn(item)
          val myBatchEnd = seqNr - index + end
          if (seqNr == batchEnd) {
            val result =
              if (myBatchEnd == batchEnd) {
                val r = (item :: batch).reverse
                batch = Nil
                batchEnd = NoBatch
                r
              } else {
                // foul play detected, scrap this batch
                batch = item :: Nil
                batchEnd = myBatchEnd
                Nil
              }
            if (result.size == (end + 1)) push(out, result)
            else pull(in)
          } else if (batchEnd == NoBatch || seqNr > batchEnd) {
            batchEnd = myBatchEnd
            batch = item :: Nil
            pull(in)
          } else {
            if (batchEnd == myBatchEnd) batch ::= item
            else {
              batchEnd = myBatchEnd
              batch = item :: Nil
            }
            pull(in)
          }
        } else {
          push(out, item :: Nil)
          // throw away possible incomplete batch
          batchEnd = NoBatch
          batch = Nil
        }
      }

      private def sn(item: Item): Long = {
        val s = item.get(Key).getS
        val n = item.get(Sort).getN.toLong
        val pos = s.lastIndexOf('-')
        require(pos != -1, "unknown key format " + s)
        s.substring(pos + 1).toLong * PartitionSize + n
      }

    }