in core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ContinuousQuery.scala [87:197]
override def shape: SourceShape[T] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogicWithLogging(shape) with OutHandler {
var nextRow: OptionVal[T] = OptionVal.none[T]
var sinkIn: SubSinkInlet[T] = _
var state = initialState
var nrElements = Long.MaxValue
var subStreamFinished = false
private val beforeQueryCallback = getAsyncCallback[Try[S]] {
case Success(newState) =>
state = newState
runNextQuery()
case Failure(exc) =>
failStage(exc)
}
private def pushAndUpdateState(t: T): Unit = {
state = updateState(state, t)
push(out, t)
}
override protected def onTimer(timerKey: Any): Unit = timerKey match {
case NextQuery => next()
}
def next(): Unit = {
val delay =
if (nrElements == Long.MaxValue) None
else delayNextQuery(state)
delay match {
case Some(d) =>
nrElements = Long.MaxValue
scheduleOnce(NextQuery, d)
case None =>
nrElements = 0
subStreamFinished = false
beforeQuery(state) match {
case None => runNextQuery()
case Some(beforeQueryFuture) =>
beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContexts.parasitic)
}
}
}
private def runNextQuery(): Unit = {
nextQuery(state) match {
case (newState, Some(source)) =>
state = newState
sinkIn = new SubSinkInlet[T]("queryIn")
sinkIn.setHandler(new InHandler {
override def onPush(): Unit = {
if (!nextRow.isEmpty) {
throw new IllegalStateException(s"onPush called when we already have next row.")
}
nrElements += 1
if (isAvailable(out)) {
val element = sinkIn.grab()
pushAndUpdateState(element)
sinkIn.pull()
} else {
nextRow = OptionVal(sinkIn.grab())
}
}
override def onUpstreamFinish(): Unit =
if (nextRow.isDefined) {
// wait for the element to be pulled
subStreamFinished = true
} else {
next()
}
})
val graph = Source
.fromGraph(source)
.to(sinkIn.sink)
interpreter.subFusingMaterializer.materialize(graph)
sinkIn.pull()
case (newState, None) =>
state = newState
completeStage()
}
}
override def preStart(): Unit =
// eager pull
next()
override def onPull(): Unit =
nextRow match {
case OptionVal.Some(row) =>
pushAndUpdateState(row)
nextRow = OptionVal.none[T]
if (subStreamFinished) {
next()
} else {
if (!sinkIn.isClosed && !sinkIn.hasBeenPulled) {
sinkIn.pull()
}
}
case OptionVal.None =>
if (!subStreamFinished && sinkIn != null && !sinkIn.isClosed && !sinkIn.hasBeenPulled) {
sinkIn.pull()
}
}
setHandler(out, this)
}