in persistence/src/main/scala/org/apache/pekko/persistence/journal/ReplayFilter.scala [72:168]
def this(persistentActor: ActorRef, mode: ReplayFilter.Mode, windowSize: Int, maxOldWriters: Int) =
this(persistentActor, mode, windowSize, maxOldWriters, debugEnabled = false)
val buffer = new LinkedList[ReplayedMessage]()
val oldWriters = LinkedHashSet.empty[String]
var writerUuid = ""
var seqNo = -1L
def receive = {
case r @ ReplayedMessage(persistent) =>
if (debugEnabled)
log.debug("Replay: {}", persistent)
try {
if (buffer.size == windowSize) {
val msg = buffer.removeFirst()
persistentActor.tell(msg, Actor.noSender)
}
if (r.persistent.writerUuid == writerUuid) {
// from same writer
if (r.persistent.sequenceNr < seqNo) {
val errMsg =
s"Invalid replayed event [sequenceNr=${r.persistent.sequenceNr}, writerUUID=${r.persistent.writerUuid}] as " +
s"the sequenceNr should be equal to or greater than already-processed event [sequenceNr=$seqNo, writerUUID=$writerUuid] from the same writer, for the same persistenceId [${r.persistent.persistenceId}]. " +
"Perhaps, events were journaled out of sequence, or duplicate persistenceId for different entities?"
logIssue(errMsg)
mode match {
case RepairByDiscardOld => // discard
case Fail => throw new IllegalStateException(errMsg)
case Warn => buffer.add(r)
case Disabled => throw new IllegalArgumentException("mode must not be Disabled")
}
} else {
// note that it is alright with == seqNo, since such may be emitted EventSeq
buffer.add(r)
seqNo = r.persistent.sequenceNr
}
} else if (oldWriters.contains(r.persistent.writerUuid)) {
// from old writer
val errMsg =
s"Invalid replayed event [sequenceNr=${r.persistent.sequenceNr}, writerUUID=${r.persistent.writerUuid}]. " +
s"There was already a newer writer whose last replayed event was [sequenceNr=$seqNo, writerUUID=$writerUuid] for the same persistenceId [${r.persistent.persistenceId}]." +
"Perhaps, the old writer kept journaling messages after the new writer created, or duplicate persistenceId for different entities?"
logIssue(errMsg)
mode match {
case RepairByDiscardOld => // discard
case Fail => throw new IllegalStateException(errMsg)
case Warn => buffer.add(r)
case Disabled => throw new IllegalArgumentException("mode must not be Disabled")
}
} else {
// from new writer
if (writerUuid != "")
oldWriters.add(writerUuid)
if (oldWriters.size > maxOldWriters)
oldWriters.remove(oldWriters.head)
writerUuid = r.persistent.writerUuid
seqNo = r.persistent.sequenceNr
// clear the buffer for messages from old writers with higher seqNo
val iter = buffer.iterator()
while (iter.hasNext()) {
val msg = iter.next()
if (msg.persistent.sequenceNr >= seqNo) {
val errMsg =
s"Invalid replayed event [sequenceNr=${r.persistent.sequenceNr}, writerUUID=${r.persistent.writerUuid}] from a new writer. " +
s"An older writer already sent an event [sequenceNr=${msg.persistent.sequenceNr}, writerUUID=${msg.persistent
.writerUuid}] whose sequence number was equal or greater for the same persistenceId [${r.persistent.persistenceId}]. " +
"Perhaps, the new writer journaled the event out of sequence, or duplicate persistenceId for different entities?"
logIssue(errMsg)
mode match {
case RepairByDiscardOld => iter.remove() // discard
case Fail => throw new IllegalStateException(errMsg)
case Warn => // keep
case Disabled => throw new IllegalArgumentException("mode must not be Disabled")
}
}
}
buffer.add(r)
}
} catch {
case e: IllegalStateException if mode == Fail => fail(e)
}
case msg @ (_: RecoverySuccess | _: ReplayMessagesFailure) =>
if (debugEnabled)
log.debug("Replay completed: {}", msg)
sendBuffered()
persistentActor.tell(msg, Actor.noSender)
context.stop(self)
}