final def receive = receiveWriteJournal.orElse[Any, Unit]()

in persistence/src/main/scala/org/apache/pekko/persistence/journal/AsyncWriteJournal.scala [65:209]


  final def receive = receiveWriteJournal.orElse[Any, Unit](receivePluginInternal)

  final val receiveWriteJournal: Actor.Receive = {
    // cannot be a val in the trait due to binary compatibility
    val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug")
    val eventStream = context.system.eventStream // used from Future callbacks
    implicit val ec: ExecutionContext = context.dispatcher

    {
      case WriteMessages(messages, persistentActor, actorInstanceId) =>
        val cctr = resequencerCounter
        resequencerCounter += messages.foldLeft(1)((acc, m) => acc + m.size)

        val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite])
        val prepared = Try(preparePersistentBatch(messages))
        val writeResult = (prepared match {
          case Success(prep) if prep.isEmpty =>
            // prep is empty when all messages are instances of NonPersistentRepr (used for defer) in that case,
            // we continue right away without calling the journal plugin (most plugins fail calling head on empty Seq).
            // Ordering of the replies is handled by Resequencer
            Future.successful(Nil)
          case Success(prep) =>
            // try in case the asyncWriteMessages throws
            try breaker.withCircuitBreaker(asyncWriteMessages(prep))
            catch { case NonFatal(e) => Future.failed(e) }
          case f @ Failure(_) =>
            // exception from preparePersistentBatch => rejected
            Future.successful(messages.collect { case _: AtomicWrite => f })
        }).map { results =>
          if (results.nonEmpty && results.size != atomicWriteCount)
            throw new IllegalStateException(
              "asyncWriteMessages returned invalid number of results. " +
              s"Expected [${prepared.get.size}], but got [${results.size}]")
          results
        }

        writeResult.onComplete {
          case Success(results) =>
            resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self)

            val resultsIter =
              if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit)
              else results.iterator
            var n = cctr + 1
            messages.foreach {
              case a: AtomicWrite =>
                resultsIter.next() match {
                  case Success(_) =>
                    a.payload.foreach { p =>
                      resequencer ! Desequenced(WriteMessageSuccess(p, actorInstanceId), n, persistentActor, p.sender)
                      n += 1
                    }
                  case Failure(e) =>
                    a.payload.foreach { p =>
                      resequencer ! Desequenced(
                        WriteMessageRejected(p, e, actorInstanceId),
                        n,
                        persistentActor,
                        p.sender)
                      n += 1
                    }
                }

              case r: NonPersistentRepr =>
                resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
                n += 1
            }

          case Failure(e) =>
            resequencer ! Desequenced(WriteMessagesFailed(e, atomicWriteCount), cctr, persistentActor, self)
            var n = cctr + 1
            messages.foreach {
              case a: AtomicWrite =>
                a.payload.foreach { p =>
                  resequencer ! Desequenced(WriteMessageFailure(p, e, actorInstanceId), n, persistentActor, p.sender)
                  n += 1
                }
              case r: NonPersistentRepr =>
                resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
                n += 1
            }
        }

      case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) =>
        val replyTo =
          if (isReplayFilterEnabled)
            context.actorOf(
              ReplayFilter.props(
                persistentActor,
                replayFilterMode,
                replayFilterWindowSize,
                replayFilterMaxOldWriters,
                replayDebugEnabled))
          else persistentActor

        val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)
        /*
         * The API docs for the [[AsyncRecovery]] say not to rely on asyncReadHighestSequenceNr
         * being called before a call to asyncReplayMessages even tho it currently always is. The Cassandra
         * plugin does rely on this so if you change this change the Cassandra plugin.
         */
        breaker
          .withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom))
          .flatMap { highSeqNr =>
            val toSeqNr = math.min(toSequenceNr, highSeqNr)
            if (toSeqNr <= 0L || fromSequenceNr > toSeqNr)
              Future.successful(highSeqNr)
            else {
              // Send replayed messages and replay result to persistentActor directly. No need
              // to resequence replayed messages relative to written and looped messages.
              // not possible to use circuit breaker here
              asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p =>
                if (!p.deleted) // old records from Akka 2.3 may still have the deleted flag
                  adaptFromJournal(p).foreach { adaptedPersistentRepr =>
                    replyTo.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
                  }
              }.map(_ => highSeqNr)
            }
          }
          .map { highSeqNr =>
            RecoverySuccess(highSeqNr)
          }
          .recover {
            case e => ReplayMessagesFailure(e)
          }
          .pipeTo(replyTo)
          .foreach { _ =>
            if (publish) eventStream.publish(r)
          }

      case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) =>
        breaker
          .withCircuitBreaker(asyncDeleteMessagesTo(persistenceId, toSequenceNr))
          .map { _ =>
            DeleteMessagesSuccess(toSequenceNr)
          }
          .recover {
            case e => DeleteMessagesFailure(e, toSequenceNr)
          }
          .pipeTo(persistentActor)
          .onComplete { _ =>
            if (publish) eventStream.publish(d)
          }
    }
  }