in actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala [481:546]
final def systemInvoke(message: SystemMessage): Unit = {
/*
* When recreate/suspend/resume are received while restarting (i.e. between
* preRestart and postRestart, waiting for children to terminate), these
* must not be executed immediately, but instead queued and released after
* finishRecreate returns. This can only ever be triggered by
* ChildTerminated, and ChildTerminated is not one of the queued message
* types (hence the overwrite further down). Mailbox sets message.next=null
* before systemInvoke, so this will only be non-null during such a replay.
*/
def calculateState: Int =
if (waitingForChildrenOrNull ne null) SuspendedWaitForChildrenState
else if (mailbox.isSuspended) SuspendedState
else DefaultState
@tailrec def sendAllToDeadLetters(messages: EarliestFirstSystemMessageList): Unit =
if (messages.nonEmpty) {
val tail = messages.tail
val msg = messages.head
msg.unlink()
provider.deadLetters ! msg
sendAllToDeadLetters(tail)
}
def shouldStash(m: SystemMessage, state: Int): Boolean =
(state: @switch) match {
case DefaultState => false
case SuspendedState => m.isInstanceOf[StashWhenFailed]
case SuspendedWaitForChildrenState => m.isInstanceOf[StashWhenWaitingForChildren]
}
@tailrec
def invokeAll(messages: EarliestFirstSystemMessageList, currentState: Int): Unit = {
val rest = messages.tail
val message = messages.head
message.unlink()
try {
message match {
case message: SystemMessage if shouldStash(message, currentState) => stash(message)
case f: Failed => handleFailure(f)
case DeathWatchNotification(a, ec, at) => watchedActorTerminated(a, ec, at)
case Create(failure) => create(failure)
case Watch(watchee, watcher) => addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) => remWatcher(watchee, watcher)
case Recreate(cause) => faultRecreate(cause)
case Suspend() => faultSuspend()
case Resume(inRespToFailure) => faultResume(inRespToFailure)
case Terminate() => terminate()
case Supervise(child, async) => supervise(child, async)
case NoMessage => // only here to suppress warning
}
} catch handleNonFatalOrInterruptedException { e =>
handleInvokeFailure(Nil, e)
}
val newState = calculateState
// As each state accepts a strict subset of another state, it is enough to unstash if we "walk up" the state
// chain
val todo = if (newState < currentState) rest.reversePrepend(unstashAll()) else rest
if (isTerminated) sendAllToDeadLetters(todo)
else if (todo.nonEmpty) invokeAll(todo, newState)
}
invokeAll(new EarliestFirstSystemMessageList(message), calculateState)
}