in persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala [131:249]
override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = {
val ctx = context.asScala
val hasCustomLoggerName = ctx match {
case internalCtx: ActorContextImpl[_] => internalCtx.hasCustomLoggerName
case _ => false
}
if (!hasCustomLoggerName) ctx.setLoggerName(loggerClass)
val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""),
journalPluginConfig, snapshotPluginConfig)
// stashState outside supervise because StashState should survive restarts due to persist failures
val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
// This method ensures that the MDC is set before we use the internal logger
def internalLogger() = {
if (settings.useContextLoggerForInternalLogging) ctx.log
else {
// MDC is cleared (if used) from aroundReceive in ActorAdapter after processing each message,
// but important to call `context.log` to mark MDC as used
ctx.log
loggerForInternal
}
}
val actualSignalHandler: PartialFunction[(State, Signal), Unit] = signalHandler.orElse {
// default signal handler is always the fallback
case (_, SnapshotCompleted(meta)) =>
internalLogger().debug("Save snapshot successful, snapshot metadata [{}].", meta)
case (_, SnapshotFailed(meta, failure)) =>
internalLogger()
.error(s"Save snapshot failed, snapshot metadata [$meta] due to: ${failure.getMessage}", failure)
case (_, DeleteSnapshotsCompleted(DeletionTarget.Individual(meta))) =>
internalLogger().debug("Persistent snapshot [{}] deleted successfully.", meta)
case (_, DeleteSnapshotsCompleted(DeletionTarget.Criteria(criteria))) =>
internalLogger().debug("Persistent snapshots given criteria [{}] deleted successfully.", criteria)
case (_, DeleteSnapshotsFailed(DeletionTarget.Individual(meta), failure)) =>
internalLogger().warn2("Failed to delete snapshot with meta [{}] due to: {}", meta, failure.getMessage)
case (_, DeleteSnapshotsFailed(DeletionTarget.Criteria(criteria), failure)) =>
internalLogger().warn2(
"Failed to delete snapshots given criteria [{}] due to: {}",
criteria,
failure.getMessage)
case (_, DeleteEventsCompleted(toSequenceNr)) =>
internalLogger().debug("Events successfully deleted to sequence number [{}].", toSequenceNr)
case (_, DeleteEventsFailed(toSequenceNr, failure)) =>
internalLogger().warn2(
"Failed to delete events to sequence number [{}] due to: {}",
toSequenceNr,
failure.getMessage)
case (_, EventSourcedBehaviorImpl.GetPersistenceId(replyTo)) => replyTo ! persistenceId
}
// do this once, even if the actor is restarted
initialize(context.asScala)
Behaviors
.supervise {
Behaviors.setup[Command] { _ =>
val eventSourcedSetup = new BehaviorSetup(
ctx.asInstanceOf[ActorContext[InternalProtocol]],
persistenceId,
emptyState,
commandHandler,
eventHandler,
WriterIdentity.newIdentity(),
actualSignalHandler,
tagger,
eventAdapter,
snapshotAdapter,
snapshotWhen,
recovery,
retention,
holdingRecoveryPermit = false,
settings = settings,
stashState = stashState,
replication = replication,
publishEvents = publishEvents,
internalLoggerFactory = () => internalLogger())
// needs to accept Any since we also can get messages from the journal
// not part of the user facing Command protocol
def interceptor: BehaviorInterceptor[Any, InternalProtocol] = new BehaviorInterceptor[Any, InternalProtocol] {
import BehaviorInterceptor._
override def aroundReceive(
ctx: typed.TypedActorContext[Any],
msg: Any,
target: ReceiveTarget[InternalProtocol]): Behavior[InternalProtocol] = {
val innerMsg = msg match {
case res: JournalProtocol.Response => InternalProtocol.JournalResponse(res)
case res: SnapshotProtocol.Response => InternalProtocol.SnapshotterResponse(res)
case RecoveryPermitter.RecoveryPermitGranted => InternalProtocol.RecoveryPermitGranted
case internal: InternalProtocol => internal // such as RecoveryTickEvent
case cmd => InternalProtocol.IncomingCommand(cmd.asInstanceOf[Command])
}
target(ctx, innerMsg)
}
override def aroundSignal(
ctx: typed.TypedActorContext[Any],
signal: Signal,
target: SignalTarget[InternalProtocol]): Behavior[InternalProtocol] = {
if (signal == PostStop) {
eventSourcedSetup.cancelRecoveryTimer()
// clear stash to be GC friendly
stashState.clearStashBuffers()
}
target(ctx, signal)
}
override def toString: String = "EventSourcedBehaviorInterceptor"
}
Behaviors.intercept(() => interceptor)(RequestingRecoveryPermit(eventSourcedSetup)).narrow
}
}
.onFailure[JournalFailureException](supervisionStrategy)
}