in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala [501:663]
override def initialAttributes: Attributes = DefaultAttributes.split and SourceLocation.forLambda(p)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler { parent =>
import Split._
private val SubscriptionTimer = "SubstreamSubscriptionTimer"
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private val timeout: FiniteDuration =
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
private var substreamSource: SubSourceOutlet[T] = null
private var substreamWaitingToBePushed = false
private var substreamCancelled = false
def propagateSubstreamCancel(ex: Throwable): Boolean =
decider(ex) match {
case Supervision.Stop => true
case Supervision.Resume => false
case Supervision.Restart => false
}
override def onPull(): Unit = {
if (substreamSource eq null) {
// can be already pulled from substream in case split after
if (!hasBeenPulled(in)) pull(in)
} else if (substreamWaitingToBePushed) pushSubstreamSource()
}
override def onDownstreamFinish(cause: Throwable): Unit = {
// If the substream is already cancelled or it has not been handed out, we can go away
if ((substreamSource eq null) || substreamWaitingToBePushed || substreamCancelled) cancelStage(cause)
}
override def onPush(): Unit = {
val handler = new SubstreamHandler
val elem = grab(in)
decision match {
case SplitAfter if p(elem) =>
push(out, Source.single(elem))
// Next pull will come from the next substream that we will open
case _ =>
handler.firstElem = elem
}
handOver(handler)
}
override def onUpstreamFinish(): Unit = completeStage()
// initial input handler
setHandlers(in, out, this)
private def handOver(handler: SubstreamHandler): Unit = {
if (isClosed(out)) completeStage()
else {
substreamSource = new SubSourceOutlet[T]("SplitSource")
substreamSource.setHandler(handler)
substreamCancelled = false
setHandler(in, handler)
setKeepGoing(enabled = handler.hasInitialElement)
if (isAvailable(out)) {
if (decision == SplitBefore || handler.hasInitialElement) pushSubstreamSource() else pull(in)
} else substreamWaitingToBePushed = true
}
}
private def pushSubstreamSource(): Unit = {
push(out, Source.fromGraph(substreamSource.source))
scheduleOnce(SubscriptionTimer, timeout)
substreamWaitingToBePushed = false
}
override protected def onTimer(timerKey: Any): Unit = substreamSource.timeout(timeout)
private class SubstreamHandler extends InHandler with OutHandler {
var firstElem: T = null.asInstanceOf[T]
def hasInitialElement: Boolean = firstElem.asInstanceOf[AnyRef] ne null
private var willCompleteAfterInitialElement = false
// Substreams are always assumed to be pushable position when we enter this method
private def closeThis(handler: SubstreamHandler, currentElem: T): Unit = {
decision match {
case SplitAfter =>
if (!substreamCancelled) {
substreamSource.push(currentElem)
substreamSource.complete()
}
case SplitBefore =>
handler.firstElem = currentElem
if (!substreamCancelled) substreamSource.complete()
}
}
override def onPull(): Unit = {
cancelTimer(SubscriptionTimer)
if (hasInitialElement) {
substreamSource.push(firstElem)
firstElem = null.asInstanceOf[T]
setKeepGoing(false)
if (willCompleteAfterInitialElement) {
substreamSource.complete()
completeStage()
}
} else pull(in)
}
override def onDownstreamFinish(cause: Throwable): Unit = {
substreamCancelled = true
if (isClosed(in) || propagateSubstreamCancel(cause)) {
cancelStage(cause)
} else {
// Start draining
if (!hasBeenPulled(in)) pull(in)
}
}
override def onPush(): Unit = {
val elem = grab(in)
try {
if (p(elem)) {
val handler = new SubstreamHandler
closeThis(handler, elem)
if (decision == SplitBefore) handOver(handler)
else {
substreamSource = null
setHandler(in, parent)
pull(in)
}
} else {
// Drain into the void
if (substreamCancelled) pull(in)
else substreamSource.push(elem)
}
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Resume => pull(in)
case Supervision.Stop => onUpstreamFailure(ex)
case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
}
}
}
override def onUpstreamFinish(): Unit =
if (hasInitialElement) willCompleteAfterInitialElement = true
else {
substreamSource.complete()
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
substreamSource.fail(ex)
failStage(ex)
}
}
}