in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala [282:457]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with OutHandler with InHandler {
parent =>
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]()
private val closedSubstreams =
if (allowClosedSubstreamRecreation) Collections.unmodifiableSet(Collections.emptySet[Any])
else new java.util.HashSet[Any]()
private val timeout: FiniteDuration =
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
private var substreamWaitingToBePushed: Option[SubstreamSource] = None
private var nextElementKey: K = null.asInstanceOf[K]
private var nextElementValue: T = null.asInstanceOf[T]
private var _nextId = 0
private val substreamsJustStared = new java.util.HashSet[Any]()
private var firstPushCounter: Int = 0
private val tooManySubstreamsOpenException = new TooManySubstreamsOpenException
private def nextId(): Long = { _nextId += 1; _nextId }
private def hasNextElement = nextElementKey != null
private def clearNextElement(): Unit = {
nextElementKey = null.asInstanceOf[K]
nextElementValue = null.asInstanceOf[T]
}
private def tryCompleteAll(): Boolean =
if (activeSubstreamsMap.isEmpty || (!hasNextElement && firstPushCounter == 0)) {
for (value <- activeSubstreamsMap.values().asScala) value.complete()
completeStage()
true
} else false
private def tryCancel(cause: Throwable): Boolean =
// if there's no active substreams or there's only one but it's not been pushed yet
if (activeSubstreamsMap.isEmpty || (activeSubstreamsMap.size == 1 && substreamWaitingToBePushed.isDefined)) {
cancelStage(cause)
true
} else false
private def fail(ex: Throwable): Unit = {
for (value <- activeSubstreamsMap.values().asScala) value.fail(ex)
failStage(ex)
}
private def needToPull: Boolean =
!(hasBeenPulled(in) || isClosed(in) || hasNextElement || substreamWaitingToBePushed.nonEmpty)
override def onPull(): Unit = {
substreamWaitingToBePushed match {
case Some(substreamSource) =>
push(out, Source.fromGraph(substreamSource.source))
scheduleOnce(substreamSource.key, timeout)
substreamWaitingToBePushed = None
case None =>
if (hasNextElement) {
val subSubstreamSource = activeSubstreamsMap.get(nextElementKey)
if (subSubstreamSource.isAvailable) {
subSubstreamSource.push(nextElementValue)
clearNextElement()
}
} else if (!hasBeenPulled(in)) tryPull(in)
}
}
override def onUpstreamFailure(ex: Throwable): Unit = fail(ex)
override def onUpstreamFinish(): Unit = if (!tryCompleteAll()) setKeepGoing(true)
override def onDownstreamFinish(cause: Throwable): Unit = if (!tryCancel(cause)) setKeepGoing(true)
override def onPush(): Unit =
try {
val elem = grab(in)
val key = keyFor(elem)
require(key != null, "Key cannot be null")
val substreamSource = activeSubstreamsMap.get(key)
if (substreamSource != null) {
if (substreamSource.isAvailable) substreamSource.push(elem)
else {
nextElementKey = key
nextElementValue = elem
}
} else {
if (closedSubstreams.contains(key)) {
// If the sub stream is already closed, we just skip the current element and pull the next element.
pull(in)
} else if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) {
throw tooManySubstreamsOpenException
} else runSubstream(key, elem)
}
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop => fail(ex)
case Supervision.Resume | Supervision.Restart => if (!hasBeenPulled(in)) pull(in)
}
}
private def runSubstream(key: K, value: T): Unit = {
val substreamSource = new SubstreamSource("GroupBySource " + nextId(), key, value)
activeSubstreamsMap.put(key, substreamSource)
firstPushCounter += 1
if (isAvailable(out)) {
push(out, Source.fromGraph(substreamSource.source))
scheduleOnce(key, timeout)
substreamWaitingToBePushed = None
} else {
setKeepGoing(true)
substreamsJustStared.add(substreamSource)
substreamWaitingToBePushed = Some(substreamSource)
}
}
override protected def onTimer(timerKey: Any): Unit = {
val substreamSource = activeSubstreamsMap.get(timerKey)
if (substreamSource != null) {
if (!allowClosedSubstreamRecreation) {
closedSubstreams.add(timerKey)
}
activeSubstreamsMap.remove(timerKey)
if (isClosed(in)) tryCompleteAll()
}
}
setHandlers(in, out, this)
private class SubstreamSource(name: String, val key: K, var firstElement: T)
extends SubSourceOutlet[T](name)
with OutHandler {
def firstPush(): Boolean = firstElement != null
def hasNextForSubSource = hasNextElement && nextElementKey == key
private def completeSubStream(): Unit = {
complete()
activeSubstreamsMap.remove(key)
if (!allowClosedSubstreamRecreation) {
closedSubstreams.add(key)
}
}
private def tryCompleteHandler(): Unit = {
if (parent.isClosed(in) && !hasNextForSubSource) {
completeSubStream()
tryCompleteAll()
}
}
override def onPull(): Unit = {
cancelTimer(key)
if (firstPush()) {
firstPushCounter -= 1
push(firstElement)
firstElement = null.asInstanceOf[T]
substreamsJustStared.remove(this)
if (substreamsJustStared.isEmpty) setKeepGoing(false)
} else if (hasNextForSubSource) {
push(nextElementValue)
clearNextElement()
} else if (needToPull) pull(in)
tryCompleteHandler()
}
override def onDownstreamFinish(cause: Throwable): Unit = {
if (hasNextElement && nextElementKey == key) clearNextElement()
if (firstPush()) firstPushCounter -= 1
completeSubStream()
if (parent.isClosed(out)) tryCancel(cause)
if (parent.isClosed(in)) tryCompleteAll() else if (needToPull) pull(in)
}
setHandler(this)
}
}