override def createLogicAndMaterializedValue()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala [129:508]


  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Control) = {
    val logic = new TimerGraphStageLogic(shape) with OutHandler with StageLogging with Control {

      override protected def logSource: Class[_] =
        classOf[EventsByPersistenceIdStage]

      implicit def ec: ExecutionContext = materializer.executionContext

      val donePromise = Promise[Done]()

      var expectedNextSeqNr = 0L // initialized in preStart
      var partition = 0L
      var count = 0L

      var pendingPoll: Option[Long] = None
      var pendingFastForward: Option[Long] = None
      var lookingForMissingSeqNr: Option[MissingSeqNr] = None

      var queryState: QueryState = QueryIdle

      val newResultSetCb = getAsyncCallback[Try[AsyncResultSet]] {
        case Success(rs) =>
          val q = queryState match {
            case q: QueryInProgress => q
            case _ =>
              throw new IllegalStateException(s"New ResultSet when in unexpected state $queryState")
          }
          val empty = isExhausted(rs) && !q.fetchMore
          if (log.isDebugEnabled)
            log.debug(
              "EventsByPersistenceId [{}] Query took [{}] ms {}",
              persistenceId,
              (System.nanoTime() - q.startTime).nanos.toMillis,
              if (empty) "(empty)" else "")
          queryState = QueryResult(rs, empty, q.switchPartition)
          tryPushOne()
        case Failure(e) => onFailure(e)
      }

      val pollCb = getAsyncCallback[Long] { knownSeqNr =>
        if (refreshInterval.isEmpty)
          throw new IllegalStateException("External poll only possible for live queries")

        if (knownSeqNr >= expectedNextSeqNr) {
          log.debug("EventsByPersistenceId [{}] External poll, known seqNr [{}]", persistenceId, knownSeqNr)
          queryState match {
            case QueryIdle => query(switchPartition = false)
            case _: QueryResult | _: QueryInProgress =>
              pendingPoll = Some(knownSeqNr)
          }
        }
      }

      val fastForwardCb = getAsyncCallback[Long] { nextSeqNr =>
        if (refreshInterval.isEmpty)
          throw new IllegalStateException("Fast forward only possible for live queries")
        if (!fastForwardEnabled)
          throw new IllegalStateException("Fast forward has been disabled")

        log.debug(
          "Fast forward request being processed: Next Sequence Nr: {} Current Sequence Nr: {}",
          nextSeqNr,
          expectedNextSeqNr)
        if (nextSeqNr > expectedNextSeqNr) {
          queryState match {
            case QueryIdle => internalFastForward(nextSeqNr)
            case _ =>
              log.debug("Query in progress. Fast forward pending.")
              pendingFastForward = Some(nextSeqNr)
          }
        }
      }

      val highestDeletedSequenceNrCb = getAsyncCallback[Try[Long]] {
        case Success(delSeqNr) =>
          // lowest possible seqNr is 1
          expectedNextSeqNr = math.max(delSeqNr + 1, math.max(fromSeqNr, 1))
          partition = partitionNr(expectedNextSeqNr)
          // initial query
          queryState = QueryIdle
          query(switchPartition = false)

        case Failure(e) => onFailure(e)

      }

      val checkForGapsCb: AsyncCallback[(Int, Try[Option[Row]])] = getAsyncCallback {
        case (foundEmptyPartitionCount, result) =>
          result match {
            case Success(mbRow) =>
              mbRow.map(_.getLong("sequence_nr")) match {
                case None | Some(0) =>
                  // Some(0) when old schema with static used column, everything deleted in this partition
                  if (foundEmptyPartitionCount == 5)
                    completeStage()
                  else {
                    partition = partition + 1
                    checkForGaps(foundEmptyPartitionCount + 1)
                  }
                case Some(_) =>
                  if (foundEmptyPartitionCount == 0)
                    partition = partition + 1
                  query(switchPartition = false)
              }
            case Failure(_) =>
              throw new IllegalStateException("Should not be able to get here")
          }
      }

      private def internalFastForward(nextSeqNr: Long): Unit = {
        log.debug(
          "EventsByPersistenceId [{}] External fast-forward to seqNr [{}] from current [{}]",
          persistenceId,
          nextSeqNr,
          expectedNextSeqNr)
        expectedNextSeqNr = nextSeqNr
        val nextPartition = partitionNr(nextSeqNr)
        if (nextPartition > partition)
          partition = nextPartition
      }

      def partitionNr(sequenceNr: Long): Long =
        (sequenceNr - 1L) / journalSettings.targetPartitionSize

      override def preStart(): Unit = {
        queryState = QueryInProgress(switchPartition = false, fetchMore = false, System.nanoTime())
        session.highestDeletedSequenceNumber(persistenceId).onComplete(highestDeletedSequenceNrCb.invoke)

        refreshInterval match {
          case Some(interval) =>
            val initial =
              if (interval >= 2.seconds)
                (interval / 2) + ThreadLocalRandom.current().nextLong(interval.toMillis / 2).millis
              else interval

            scheduleContinue(initial, interval)
          case None =>
        }
      }

      @nowarn("msg=deprecated")
      private def scheduleContinue(initial: FiniteDuration, interval: FiniteDuration): Unit = {
        schedulePeriodicallyWithInitialDelay(Continue, initial, interval)
      }

      override def postStop(): Unit = {
        // for GC, in case stage is still referenced for some reason, e.g. the materialized value
        queryState = QueryIdle
        donePromise.trySuccess(Done)
      }

      def onFailure(e: Throwable): Unit = {
        donePromise.tryFailure(e)
        failStage(e)
      }

      override protected def onTimer(timerKey: Any): Unit = timerKey match {
        case Continue            => continue()
        case LookForMissingSeqNr => lookForMissingSeqNr()
        case o                   => throw new IllegalStateException("Unexpected timerKey: " + o)
      }

      def continue(): Unit =
        // regular continue-by-tick disabled when looking for missing seqNr
        if (lookingForMissingSeqNr.isEmpty) {
          queryState match {
            case QueryIdle          => query(switchPartition = false)
            case _: QueryResult     => tryPushOne()
            case _: QueryInProgress => // result will come
          }
        }

      def lookForMissingSeqNr(): Unit =
        lookingForMissingSeqNr match {
          case Some(m) if m.deadline.isOverdue() =>
            import pekko.util.PrettyDuration.PrettyPrintableDuration
            onFailure(
              new IllegalStateException(
                s"Sequence number [$expectedNextSeqNr] still missing after " +
                s"[${querySettings.eventsByPersistenceIdEventTimeout.pretty}], " +
                s"saw unexpected seqNr [${m.sawSeqNr}] for persistenceId [$persistenceId]."))
          case Some(_) =>
            queryState = QueryIdle
            query(false)
          case None =>
            throw new IllegalStateException("Should not be able to get here")
        }

      def query(switchPartition: Boolean): Unit = {
        queryState match {
          case QueryIdle => // good
          case _: QueryInProgress =>
            throw new IllegalStateException("Query already in progress")
          case QueryResult(rs, _, _) =>
            if (!isExhausted(rs))
              throw new IllegalStateException("Previous query was not exhausted")
        }
        val pnr = if (switchPartition) partition + 1 else partition
        queryState = QueryInProgress(switchPartition, fetchMore = false, System.nanoTime())

        val endNr = lookingForMissingSeqNr match {
          case Some(_) =>
            log.debug(
              "EventsByPersistenceId [{}] Query for missing seqNr [{}] in partition [{}]",
              persistenceId,
              expectedNextSeqNr,
              pnr)
            expectedNextSeqNr
          case _ =>
            log.debug(
              "EventsByPersistenceId [{}] Query from seqNr [{}] in partition [{}]",
              persistenceId,
              expectedNextSeqNr,
              pnr)
            toSeqNr
        }
        session
          .selectEventsByPersistenceId(persistenceId, pnr, expectedNextSeqNr, endNr)
          .onComplete(newResultSetCb.invoke)
      }

      override def onPull(): Unit =
        tryPushOne()

      @tailrec private def tryPushOne(): Unit = {
        queryState match {
          case QueryResult(rs, empty, switchPartition) if isAvailable(out) =>
            def afterExhausted(): Unit = {
              queryState = QueryIdle
              // When ResultSet is exhausted we immediately look in next partition for more events.
              // We keep track of if the query was such switching partition and if result is empty
              // we complete the stage or wait until next Continue tick.
              if (empty && switchPartition && lookingForMissingSeqNr.isEmpty) {
                if (expectedNextSeqNr < toSeqNr && !querySettings.gapFreeSequenceNumbers) {
                  log.warning(
                    s"Gap found! Checking if data in partition was deleted for {}, expected seq nr: {}, current partition nr: {}",
                    persistenceId,
                    expectedNextSeqNr,
                    partition)
                  checkForGaps(foundEmptyPartitionCount = 0)
                } else if (refreshInterval.isEmpty) {
                  completeStage()
                } else {
                  pendingFastForward.foreach { nextNr =>
                    if (nextNr > expectedNextSeqNr)
                      internalFastForward(nextNr)
                    pendingFastForward = None
                  }
                  pendingPoll.foreach { pollNr =>
                    if (pollNr >= expectedNextSeqNr)
                      query(switchPartition = false)
                    pendingPoll = None
                  }
                }
              } else {
                // TODO if we are far from the partition boundary we could skip this query if refreshInterval.nonEmpty
                query(switchPartition = true) // next partition
              }
            }

            if (reachedEndCondition())
              completeStage()
            else if (isExhausted(rs)) {
              (lookingForMissingSeqNr, pendingFastForward) match {
                case (Some(MissingSeqNr(_, sawSeqNr)), Some(fastForwardTo)) if fastForwardTo >= sawSeqNr =>
                  log.debug(
                    "Aborting missing sequence search: {} nr due to fast forward to next sequence nr: {}",
                    lookingForMissingSeqNr,
                    fastForwardTo)
                  internalFastForward(fastForwardTo)
                  pendingFastForward = None
                  lookingForMissingSeqNr = None
                  afterExhausted()
                case (Some(_), None) =>
                  queryState = QueryIdle
                  scheduleOnce(LookForMissingSeqNr, 200.millis)
                case _ =>
                  afterExhausted()
              }
            } else if (rs.remaining() == 0) {
              log.debug("EventsByPersistenceId [{}] Fetch more from seqNr [{}]", persistenceId, expectedNextSeqNr)
              queryState = QueryInProgress(switchPartition, fetchMore = true, System.nanoTime())
              val rsFut = rs.fetchNextPage().asScala
              rsFut.onComplete(newResultSetCb.invoke)
            } else {
              val row = rs.one()
              val sequenceNr = extractSeqNr(row)
              if ((sequenceNr < expectedNextSeqNr && fastForwardEnabled) || pendingFastForward.isDefined && pendingFastForward.get > sequenceNr) {
                // skip event due to fast forward
                tryPushOne()
              } else if (pendingFastForward.isEmpty && querySettings.gapFreeSequenceNumbers && sequenceNr > expectedNextSeqNr) {
                // we will probably now come in here which isn't what we want
                lookingForMissingSeqNr match {
                  case Some(_) =>
                    throw new IllegalStateException(
                      s"Should not be able to get here when already looking for missing seqNr [$expectedNextSeqNr] for entity [$persistenceId]")
                  case None =>
                    log.debug(
                      "EventsByPersistenceId [{}] Missing seqNr [{}], found [{}], looking for event eventually appear",
                      persistenceId,
                      expectedNextSeqNr,
                      sequenceNr)
                    lookingForMissingSeqNr = Some(
                      MissingSeqNr(Deadline.now + querySettings.eventsByPersistenceIdEventTimeout, sequenceNr))
                    // Forget about any other rows in this result set until we find
                    // the missing sequence nrs
                    queryState = QueryIdle
                    query(false)
                }
              } else {
                expectedNextSeqNr = sequenceNr + 1
                partition = row.getLong("partition_nr")
                count += 1
                push(out, row)

                if (reachedEndCondition())
                  completeStage()
                else if (lookingForMissingSeqNr.isDefined) {
                  // we found that missing seqNr
                  log.debug("EventsByPersistenceId [{}] Found missing seqNr [{}]", persistenceId, sequenceNr)
                  lookingForMissingSeqNr = None
                  queryState = QueryIdle
                  if (refreshInterval.isEmpty) query(false)
                  else afterExhausted()

                } else if (isExhausted(rs)) {
                  afterExhausted()
                }

              }
            }

          case QueryIdle | _: QueryInProgress | _: QueryResult => // ok
        }
      }

      // See PR #509 for background
      // Only used when gapFreeSequenceNumbers==false
      // if full partition was cleaned up we look for two empty partitions before completing
      def checkForGaps(foundEmptyPartitionCount: Int): Unit = {
        session
          .selectSingleRow(persistenceId, partition)
          .onComplete(result => checkForGapsCb.invoke((foundEmptyPartitionCount, result)))
      }

      def extractSeqNr(row: Row): Long = row.getLong("sequence_nr")

      def reachedEndCondition(): Boolean =
        expectedNextSeqNr > toSeqNr || count >= max

      // external call via Control materialized value
      override def poll(knownSeqNr: Long): Unit =
        try pollCb.invoke(knownSeqNr)
        catch {
          case _: IllegalStateException =>
          // not initialized, see Akka issue #20503, but that is ok since this
          // is just best effort
        }

      // external call via Control materialized value
      override def fastForward(nextSeqNr: Long): Unit = {
        log.debug("Received fast forward request {}", nextSeqNr)
        if (!fastForwardEnabled)
          throw new IllegalStateException("Fast forward only has been disabled")

        try fastForwardCb.invoke(nextSeqNr)
        catch {
          case _: IllegalStateException =>
          // not initialized, see Akka issue #20503, but that is ok since this
          // is just best effort
        }
      }

      // materialized value
      override def done: Future[Done] = donePromise.future

      setHandler(out, this)
    }
    (logic, logic)
  }