override def shape: SourceShape[T] = SourceShape()

in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ContinuousQuery.scala [87:197]


  override def shape: SourceShape[T] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogicWithLogging(shape) with OutHandler {
      var nextRow: OptionVal[T] = OptionVal.none[T]
      var sinkIn: SubSinkInlet[T] = _
      var state = initialState
      var nrElements = Long.MaxValue
      var subStreamFinished = false

      private val beforeQueryCallback = getAsyncCallback[Try[S]] {
        case Success(newState) =>
          state = newState
          runNextQuery()
        case Failure(exc) =>
          failStage(exc)
      }

      private def pushAndUpdateState(t: T): Unit = {
        state = updateState(state, t)
        push(out, t)
      }

      override protected def onTimer(timerKey: Any): Unit = timerKey match {
        case NextQuery => next()
      }

      def next(): Unit = {
        val delay =
          if (nrElements == Long.MaxValue) None
          else delayNextQuery(state)

        delay match {
          case Some(d) =>
            nrElements = Long.MaxValue
            scheduleOnce(NextQuery, d)
          case None =>
            nrElements = 0
            subStreamFinished = false

            beforeQuery(state) match {
              case None => runNextQuery()
              case Some(beforeQueryFuture) =>
                beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContexts.parasitic)
            }
        }
      }

      private def runNextQuery(): Unit = {
        nextQuery(state) match {
          case (newState, Some(source)) =>
            state = newState
            sinkIn = new SubSinkInlet[T]("queryIn")
            sinkIn.setHandler(new InHandler {
              override def onPush(): Unit = {
                if (!nextRow.isEmpty) {
                  throw new IllegalStateException(s"onPush called when we already have next row.")
                }
                nrElements += 1
                if (isAvailable(out)) {
                  val element = sinkIn.grab()
                  pushAndUpdateState(element)
                  sinkIn.pull()
                } else {
                  nextRow = OptionVal(sinkIn.grab())
                }
              }

              override def onUpstreamFinish(): Unit =
                if (nextRow.isDefined) {
                  // wait for the element to be pulled
                  subStreamFinished = true
                } else {
                  next()
                }
            })
            val graph = Source
              .fromGraph(source)
              .to(sinkIn.sink)
            interpreter.subFusingMaterializer.materialize(graph)
            sinkIn.pull()
          case (newState, None) =>
            state = newState
            completeStage()
        }
      }

      override def preStart(): Unit =
        // eager pull
        next()

      override def onPull(): Unit =
        nextRow match {
          case OptionVal.Some(row) =>
            pushAndUpdateState(row)
            nextRow = OptionVal.none[T]
            if (subStreamFinished) {
              next()
            } else {
              if (!sinkIn.isClosed && !sinkIn.hasBeenPulled) {
                sinkIn.pull()
              }
            }
          case OptionVal.None =>
            if (!subStreamFinished && sinkIn != null && !sinkIn.isClosed && !sinkIn.hasBeenPulled) {
              sinkIn.pull()
            }
        }

      setHandler(out, this)
    }