def createLogic()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsStage.scala [55:153]


  def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) with StageLogging with OutHandler {

      private var queryInProgress = false
      private var knownPersistenceIds: Set[String] = Set.empty
      private var maybeResultSet = Option.empty[AsyncResultSet]
      private var buffer: Queue[String] = Queue.empty[String]

      private val queryCallback: AsyncCallback[AsyncResultSet] =
        getAsyncCallback[AsyncResultSet] { rs =>
          queryInProgress = false
          maybeResultSet = Some(rs)
          rs.currentPage().forEach { row =>
            val s = row.getString("persistence_id")
            if (!knownPersistenceIds.contains(s)) {
              buffer = buffer.enqueue(s)
              knownPersistenceIds += s
            }
          }
          flush()
          if (refreshInterval.isEmpty && buffer.isEmpty && isExhausted(rs)) {
            complete(out)
          } else if (rs.hasMorePages) {
            rs.fetchNextPage().thenAccept(queryCallback.invoke)
          }
        }

      private def isExhausted(rs: AsyncResultSet): Boolean = {
        rs.remaining() == 0 && !rs.hasMorePages
      }

      private def query(): Unit = {
        def doQuery(): Unit = {
          queryInProgress = true
          val boundStatement = preparedStatement.bind().setExecutionProfileName(readProfile)
          session.executeAsync(boundStatement).thenAccept(queryCallback.invoke)
        }
        maybeResultSet match {
          case None =>
            doQuery()
          case Some(rs) if isExhausted(rs) && !queryInProgress =>
            doQuery()
          case _ =>
          // ignore query request as either a query is in progress or there's a result set
          // which isn't fully exhausted
        }
      }

      private def flush(): Unit = {
        while (buffer.nonEmpty && isAvailable(out)) {
          val (s, newBuffer) = buffer.dequeue
          buffer = newBuffer
          push(out, s)
        }
      }

      @nowarn("msg=deprecated") // keep compatible with akka 2.5
      override def preStart(): Unit = {
        query()
        refreshInterval.foreach { interval =>
          val initial =
            if (interval >= 2.seconds)
              (interval / 2) + ThreadLocalRandom.current().nextLong(interval.toMillis / 2).millis
            else interval

          schedulePeriodicallyWithInitialDelay(Continue, initial, interval)
        }
      }

      override def onTimer(timerKey: Any): Unit = {
        timerKey match {
          case Continue =>
            query()

          case _ =>
        }
      }

      def onPull(): Unit = {
        flush()
        if (buffer.isEmpty && isAvailable(out)) {
          maybeResultSet match {
            case None =>
              query()

            case Some(rs) =>
              if (refreshInterval.isEmpty && isExhausted(rs)) {
                complete(out)
              } else {
                if (!queryInProgress && rs.remaining() == 0 && rs.hasMorePages) {
                  rs.fetchNextPage().thenAccept(queryCallback.invoke)
                }
              }
          }
        }
      }

      setHandler(out, this)
    }