in stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala [420:575]
override def shutdown(): Unit =
if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
override def isShutdown: Boolean = haveShutDown.get()
override def withNamePrefix(name: String): PhasedFusingActorMaterializer = this.copy(flowNames = flowNames.copy(name))
private[this] def createFlowName(): String = flowNames.next()
// note that this will never be overridden on a per-graph-stage basis regardless of more specific attributes
override lazy val executionContext: ExecutionContextExecutor =
dispatchers.lookup(defaultAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
override def scheduleWithFixedDelay(
initialDelay: FiniteDuration,
delay: FiniteDuration,
task: Runnable): Cancellable =
system.scheduler.scheduleWithFixedDelay(initialDelay, delay)(task)(executionContext)
override def scheduleAtFixedRate(
initialDelay: FiniteDuration,
interval: FiniteDuration,
task: Runnable): Cancellable =
system.scheduler.scheduleAtFixedRate(initialDelay, interval)(task)(executionContext)
override def schedulePeriodically(
initialDelay: FiniteDuration,
interval: FiniteDuration,
task: Runnable): Cancellable =
system.scheduler.scheduleAtFixedRate(initialDelay, interval)(task)(executionContext)
override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
system.scheduler.scheduleOnce(delay, task)(executionContext)
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat =
materialize(_runnableGraph, defaultAttributes)
@InternalStableApi
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat =
materialize(
_runnableGraph,
defaultAttributes,
PhasedFusingActorMaterializer.DefaultPhase,
PhasedFusingActorMaterializer.DefaultPhases)
override def materialize[Mat](
graph: Graph[ClosedShape, Mat],
defaultAttributes: Attributes,
defaultPhase: Phase[Any],
phases: Map[IslandTag, Phase[Any]]): Mat = {
if (isShutdown) throw new IllegalStateException("Trying to materialize stream after materializer has been shutdown")
// combine default attributes with top-level runnable/closed graph shape attributes so that per-stream
// attributes overriding defaults are used also for the top level interpreter etc.
val defaultAndGraphAttributes = defaultAttributes and graph.traversalBuilder.attributes
if (defaultAndGraphAttributes.mandatoryAttribute[ActorAttributes.FuzzingMode].enabled && !fuzzingWarningDisabled) {
_logger.warning(
"Fuzzing mode is enabled on this system. If you see this warning on your production system then " +
"set 'pekko.stream.materializer.debug.fuzzing-mode' to off.")
}
val islandTracking = new IslandTracking(
phases,
settings,
defaultAndGraphAttributes,
defaultPhase,
this,
islandNamePrefix = createFlowName() + "-")
var current: Traversal = graph.traversalBuilder.traversal
val attributesStack = new java.util.ArrayDeque[Attributes](8)
attributesStack.addLast(defaultAndGraphAttributes)
val traversalStack = new java.util.ArrayDeque[Traversal](16)
traversalStack.addLast(current)
val matValueStack = new java.util.ArrayDeque[Any](8)
if (Debug) {
println(s"--- Materializing layout:")
TraversalBuilder.printTraversal(current)
println(s"--- Start materialization")
}
// Due to how Concat works, we need a stack. This probably can be optimized for the most common cases.
while (!traversalStack.isEmpty) {
current = traversalStack.removeLast()
while (current ne EmptyTraversal) {
var nextStep: Traversal = EmptyTraversal
current match {
case MaterializeAtomic(mod, outToSlot) =>
if (Debug) println(s"materializing module: $mod")
val matAndStage = islandTracking.getCurrentPhase.materializeAtomic(mod, attributesStack.getLast)
val logic = matAndStage._1
val matValue = matAndStage._2
if (Debug) println(s" materialized value is $matValue")
matValueStack.addLast(matValue)
val stageGlobalOffset = islandTracking.getCurrentOffset
wireInlets(islandTracking, mod, logic)
wireOutlets(islandTracking, mod, logic, stageGlobalOffset, outToSlot)
if (Debug) println(s"PUSH: $matValue => $matValueStack")
case Concat(first, next) =>
if (next ne EmptyTraversal) traversalStack.addLast(next)
nextStep = first
case Pop =>
val popped = matValueStack.removeLast()
if (Debug) println(s"POP: $popped => $matValueStack")
case PushNotUsed =>
matValueStack.addLast(NotUsed)
if (Debug) println(s"PUSH: NotUsed => $matValueStack")
case transform: Transform =>
val prev = matValueStack.removeLast()
val result = transform(prev)
matValueStack.addLast(result)
if (Debug) println(s"TRFM: $matValueStack")
case compose: Compose =>
val second = matValueStack.removeLast()
val first = matValueStack.removeLast()
val result = compose(first, second)
matValueStack.addLast(result)
if (Debug) println(s"COMP: $matValueStack")
case PushAttributes(attr) =>
attributesStack.addLast(attributesStack.getLast and attr)
if (Debug) println(s"ATTR PUSH: $attr")
case PopAttributes =>
attributesStack.removeLast()
if (Debug) println(s"ATTR POP")
case EnterIsland(tag) =>
islandTracking.enterIsland(tag, attributesStack.getLast)
case ExitIsland =>
islandTracking.exitIsland()
case _ =>
}
current = nextStep
}
}
def shutdownWhileMaterializingFailure =
new IllegalStateException("Materializer shutdown while materializing stream")
try {
islandTracking.getCurrentPhase.onIslandReady()
islandTracking.allNestedIslandsReady()
if (Debug) println("--- Finished materialization")
matValueStack.peekLast().asInstanceOf[Mat]
} finally {
if (isShutdown) throw shutdownWhileMaterializingFailure
}
}