in persistence/src/main/scala/org/apache/pekko/persistence/journal/AsyncWriteJournal.scala [65:209]
final def receive = receiveWriteJournal.orElse[Any, Unit](receivePluginInternal)
final val receiveWriteJournal: Actor.Receive = {
// cannot be a val in the trait due to binary compatibility
val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug")
val eventStream = context.system.eventStream // used from Future callbacks
implicit val ec: ExecutionContext = context.dispatcher
{
case WriteMessages(messages, persistentActor, actorInstanceId) =>
val cctr = resequencerCounter
resequencerCounter += messages.foldLeft(1)((acc, m) => acc + m.size)
val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite])
val prepared = Try(preparePersistentBatch(messages))
val writeResult = (prepared match {
case Success(prep) if prep.isEmpty =>
// prep is empty when all messages are instances of NonPersistentRepr (used for defer) in that case,
// we continue right away without calling the journal plugin (most plugins fail calling head on empty Seq).
// Ordering of the replies is handled by Resequencer
Future.successful(Nil)
case Success(prep) =>
// try in case the asyncWriteMessages throws
try breaker.withCircuitBreaker(asyncWriteMessages(prep))
catch { case NonFatal(e) => Future.failed(e) }
case f @ Failure(_) =>
// exception from preparePersistentBatch => rejected
Future.successful(messages.collect { case _: AtomicWrite => f })
}).map { results =>
if (results.nonEmpty && results.size != atomicWriteCount)
throw new IllegalStateException(
"asyncWriteMessages returned invalid number of results. " +
s"Expected [${prepared.get.size}], but got [${results.size}]")
results
}
writeResult.onComplete {
case Success(results) =>
resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self)
val resultsIter =
if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit)
else results.iterator
var n = cctr + 1
messages.foreach {
case a: AtomicWrite =>
resultsIter.next() match {
case Success(_) =>
a.payload.foreach { p =>
resequencer ! Desequenced(WriteMessageSuccess(p, actorInstanceId), n, persistentActor, p.sender)
n += 1
}
case Failure(e) =>
a.payload.foreach { p =>
resequencer ! Desequenced(
WriteMessageRejected(p, e, actorInstanceId),
n,
persistentActor,
p.sender)
n += 1
}
}
case r: NonPersistentRepr =>
resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
n += 1
}
case Failure(e) =>
resequencer ! Desequenced(WriteMessagesFailed(e, atomicWriteCount), cctr, persistentActor, self)
var n = cctr + 1
messages.foreach {
case a: AtomicWrite =>
a.payload.foreach { p =>
resequencer ! Desequenced(WriteMessageFailure(p, e, actorInstanceId), n, persistentActor, p.sender)
n += 1
}
case r: NonPersistentRepr =>
resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
n += 1
}
}
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) =>
val replyTo =
if (isReplayFilterEnabled)
context.actorOf(
ReplayFilter.props(
persistentActor,
replayFilterMode,
replayFilterWindowSize,
replayFilterMaxOldWriters,
replayDebugEnabled))
else persistentActor
val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)
/*
* The API docs for the [[AsyncRecovery]] say not to rely on asyncReadHighestSequenceNr
* being called before a call to asyncReplayMessages even tho it currently always is. The Cassandra
* plugin does rely on this so if you change this change the Cassandra plugin.
*/
breaker
.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom))
.flatMap { highSeqNr =>
val toSeqNr = math.min(toSequenceNr, highSeqNr)
if (toSeqNr <= 0L || fromSequenceNr > toSeqNr)
Future.successful(highSeqNr)
else {
// Send replayed messages and replay result to persistentActor directly. No need
// to resequence replayed messages relative to written and looped messages.
// not possible to use circuit breaker here
asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p =>
if (!p.deleted) // old records from Akka 2.3 may still have the deleted flag
adaptFromJournal(p).foreach { adaptedPersistentRepr =>
replyTo.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
}
}.map(_ => highSeqNr)
}
}
.map { highSeqNr =>
RecoverySuccess(highSeqNr)
}
.recover {
case e => ReplayMessagesFailure(e)
}
.pipeTo(replyTo)
.foreach { _ =>
if (publish) eventStream.publish(r)
}
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) =>
breaker
.withCircuitBreaker(asyncDeleteMessagesTo(persistenceId, toSequenceNr))
.map { _ =>
DeleteMessagesSuccess(toSequenceNr)
}
.recover {
case e => DeleteMessagesFailure(e, toSequenceNr)
}
.pipeTo(persistentActor)
.onComplete { _ =>
if (publish) eventStream.publish(d)
}
}
}