def this()

in persistence/src/main/scala/org/apache/pekko/persistence/journal/ReplayFilter.scala [72:168]


  def this(persistentActor: ActorRef, mode: ReplayFilter.Mode, windowSize: Int, maxOldWriters: Int) =
    this(persistentActor, mode, windowSize, maxOldWriters, debugEnabled = false)

  val buffer = new LinkedList[ReplayedMessage]()
  val oldWriters = LinkedHashSet.empty[String]
  var writerUuid = ""
  var seqNo = -1L

  def receive = {
    case r @ ReplayedMessage(persistent) =>
      if (debugEnabled)
        log.debug("Replay: {}", persistent)
      try {
        if (buffer.size == windowSize) {
          val msg = buffer.removeFirst()
          persistentActor.tell(msg, Actor.noSender)
        }

        if (r.persistent.writerUuid == writerUuid) {
          // from same writer
          if (r.persistent.sequenceNr < seqNo) {
            val errMsg =
              s"Invalid replayed event [sequenceNr=${r.persistent.sequenceNr}, writerUUID=${r.persistent.writerUuid}] as " +
              s"the sequenceNr should be equal to or greater than already-processed event [sequenceNr=$seqNo, writerUUID=$writerUuid] from the same writer, for the same persistenceId [${r.persistent.persistenceId}]. " +
              "Perhaps, events were journaled out of sequence, or duplicate persistenceId for different entities?"
            logIssue(errMsg)
            mode match {
              case RepairByDiscardOld => // discard
              case Fail               => throw new IllegalStateException(errMsg)
              case Warn               => buffer.add(r)
              case Disabled           => throw new IllegalArgumentException("mode must not be Disabled")
            }
          } else {
            // note that it is alright with == seqNo, since such may be emitted EventSeq
            buffer.add(r)
            seqNo = r.persistent.sequenceNr
          }

        } else if (oldWriters.contains(r.persistent.writerUuid)) {
          // from old writer
          val errMsg =
            s"Invalid replayed event [sequenceNr=${r.persistent.sequenceNr}, writerUUID=${r.persistent.writerUuid}]. " +
            s"There was already a newer writer whose last replayed event was [sequenceNr=$seqNo, writerUUID=$writerUuid] for the same persistenceId [${r.persistent.persistenceId}]." +
            "Perhaps, the old writer kept journaling messages after the new writer created, or duplicate persistenceId for different entities?"
          logIssue(errMsg)
          mode match {
            case RepairByDiscardOld => // discard
            case Fail               => throw new IllegalStateException(errMsg)
            case Warn               => buffer.add(r)
            case Disabled           => throw new IllegalArgumentException("mode must not be Disabled")
          }

        } else {
          // from new writer
          if (writerUuid != "")
            oldWriters.add(writerUuid)
          if (oldWriters.size > maxOldWriters)
            oldWriters.remove(oldWriters.head)

          writerUuid = r.persistent.writerUuid
          seqNo = r.persistent.sequenceNr

          // clear the buffer for messages from old writers with higher seqNo
          val iter = buffer.iterator()
          while (iter.hasNext()) {
            val msg = iter.next()
            if (msg.persistent.sequenceNr >= seqNo) {
              val errMsg =
                s"Invalid replayed event [sequenceNr=${r.persistent.sequenceNr}, writerUUID=${r.persistent.writerUuid}] from a new writer. " +
                s"An older writer already sent an event [sequenceNr=${msg.persistent.sequenceNr}, writerUUID=${msg.persistent
                    .writerUuid}] whose sequence number was equal or greater for the same persistenceId [${r.persistent.persistenceId}]. " +
                "Perhaps, the new writer journaled the event out of sequence, or duplicate persistenceId for different entities?"
              logIssue(errMsg)
              mode match {
                case RepairByDiscardOld => iter.remove() // discard
                case Fail               => throw new IllegalStateException(errMsg)
                case Warn               => // keep
                case Disabled           => throw new IllegalArgumentException("mode must not be Disabled")
              }

            }
          }

          buffer.add(r)
        }

      } catch {
        case e: IllegalStateException if mode == Fail => fail(e)
      }

    case msg @ (_: RecoverySuccess | _: ReplayMessagesFailure) =>
      if (debugEnabled)
        log.debug("Replay completed: {}", msg)
      sendBuffered()
      persistentActor.tell(msg, Actor.noSender)
      context.stop(self)
  }