override def shutdown()

in stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala [420:575]


  override def shutdown(): Unit =
    if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill

  override def isShutdown: Boolean = haveShutDown.get()

  override def withNamePrefix(name: String): PhasedFusingActorMaterializer = this.copy(flowNames = flowNames.copy(name))

  private[this] def createFlowName(): String = flowNames.next()

  // note that this will never be overridden on a per-graph-stage basis regardless of more specific attributes
  override lazy val executionContext: ExecutionContextExecutor =
    dispatchers.lookup(defaultAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)

  override def scheduleWithFixedDelay(
      initialDelay: FiniteDuration,
      delay: FiniteDuration,
      task: Runnable): Cancellable =
    system.scheduler.scheduleWithFixedDelay(initialDelay, delay)(task)(executionContext)

  override def scheduleAtFixedRate(
      initialDelay: FiniteDuration,
      interval: FiniteDuration,
      task: Runnable): Cancellable =
    system.scheduler.scheduleAtFixedRate(initialDelay, interval)(task)(executionContext)

  override def schedulePeriodically(
      initialDelay: FiniteDuration,
      interval: FiniteDuration,
      task: Runnable): Cancellable =
    system.scheduler.scheduleAtFixedRate(initialDelay, interval)(task)(executionContext)

  override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
    system.scheduler.scheduleOnce(delay, task)(executionContext)

  override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat =
    materialize(_runnableGraph, defaultAttributes)

  @InternalStableApi
  override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat =
    materialize(
      _runnableGraph,
      defaultAttributes,
      PhasedFusingActorMaterializer.DefaultPhase,
      PhasedFusingActorMaterializer.DefaultPhases)

  override def materialize[Mat](
      graph: Graph[ClosedShape, Mat],
      defaultAttributes: Attributes,
      defaultPhase: Phase[Any],
      phases: Map[IslandTag, Phase[Any]]): Mat = {
    if (isShutdown) throw new IllegalStateException("Trying to materialize stream after materializer has been shutdown")

    // combine default attributes with top-level runnable/closed graph shape attributes so that per-stream
    // attributes overriding defaults are used also for the top level interpreter etc.
    val defaultAndGraphAttributes = defaultAttributes and graph.traversalBuilder.attributes
    if (defaultAndGraphAttributes.mandatoryAttribute[ActorAttributes.FuzzingMode].enabled && !fuzzingWarningDisabled) {
      _logger.warning(
        "Fuzzing mode is enabled on this system. If you see this warning on your production system then " +
        "set 'pekko.stream.materializer.debug.fuzzing-mode' to off.")
    }

    val islandTracking = new IslandTracking(
      phases,
      settings,
      defaultAndGraphAttributes,
      defaultPhase,
      this,
      islandNamePrefix = createFlowName() + "-")

    var current: Traversal = graph.traversalBuilder.traversal
    val attributesStack = new java.util.ArrayDeque[Attributes](8)
    attributesStack.addLast(defaultAndGraphAttributes)

    val traversalStack = new java.util.ArrayDeque[Traversal](16)
    traversalStack.addLast(current)

    val matValueStack = new java.util.ArrayDeque[Any](8)

    if (Debug) {
      println(s"--- Materializing layout:")
      TraversalBuilder.printTraversal(current)
      println(s"--- Start materialization")
    }

    // Due to how Concat works, we need a stack. This probably can be optimized for the most common cases.
    while (!traversalStack.isEmpty) {
      current = traversalStack.removeLast()

      while (current ne EmptyTraversal) {
        var nextStep: Traversal = EmptyTraversal
        current match {
          case MaterializeAtomic(mod, outToSlot) =>
            if (Debug) println(s"materializing module: $mod")
            val matAndStage = islandTracking.getCurrentPhase.materializeAtomic(mod, attributesStack.getLast)
            val logic = matAndStage._1
            val matValue = matAndStage._2
            if (Debug) println(s"  materialized value is $matValue")
            matValueStack.addLast(matValue)

            val stageGlobalOffset = islandTracking.getCurrentOffset

            wireInlets(islandTracking, mod, logic)
            wireOutlets(islandTracking, mod, logic, stageGlobalOffset, outToSlot)

            if (Debug) println(s"PUSH: $matValue => $matValueStack")

          case Concat(first, next) =>
            if (next ne EmptyTraversal) traversalStack.addLast(next)
            nextStep = first
          case Pop =>
            val popped = matValueStack.removeLast()
            if (Debug) println(s"POP: $popped => $matValueStack")
          case PushNotUsed =>
            matValueStack.addLast(NotUsed)
            if (Debug) println(s"PUSH: NotUsed => $matValueStack")
          case transform: Transform =>
            val prev = matValueStack.removeLast()
            val result = transform(prev)
            matValueStack.addLast(result)
            if (Debug) println(s"TRFM: $matValueStack")
          case compose: Compose =>
            val second = matValueStack.removeLast()
            val first = matValueStack.removeLast()
            val result = compose(first, second)
            matValueStack.addLast(result)
            if (Debug) println(s"COMP: $matValueStack")
          case PushAttributes(attr) =>
            attributesStack.addLast(attributesStack.getLast and attr)
            if (Debug) println(s"ATTR PUSH: $attr")
          case PopAttributes =>
            attributesStack.removeLast()
            if (Debug) println(s"ATTR POP")
          case EnterIsland(tag) =>
            islandTracking.enterIsland(tag, attributesStack.getLast)
          case ExitIsland =>
            islandTracking.exitIsland()
          case _ =>
        }
        current = nextStep
      }
    }

    def shutdownWhileMaterializingFailure =
      new IllegalStateException("Materializer shutdown while materializing stream")
    try {
      islandTracking.getCurrentPhase.onIslandReady()
      islandTracking.allNestedIslandsReady()

      if (Debug) println("--- Finished materialization")
      matValueStack.peekLast().asInstanceOf[Mat]

    } finally {
      if (isShutdown) throw shutdownWhileMaterializingFailure
    }

  }