in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [622:756]
private def isAccepted[Envelope](
recordWithOffset: RecordWithOffset,
currentInflight: Map[Pid, SeqNr]): Future[Boolean] = {
val pid = recordWithOffset.record.pid
val seqNr = recordWithOffset.record.seqNr
val currentState = getState()
val duplicate = isDuplicate(recordWithOffset.record)
if (duplicate) {
logger.trace("Filtering out duplicate sequence number [{}] for pid [{}]", seqNr, pid)
FutureFalse
} else if (recordWithOffset.strictSeqNr) {
// strictSeqNr == true is for event sourced
val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L))
def logUnexpected(): Unit = {
if (viaPubSub(recordWithOffset.offset))
logger.debug(
"Rejecting pub-sub envelope, unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}",
seqNr: java.lang.Long,
pid,
prevSeqNr: java.lang.Long,
recordWithOffset.offset)
else if (recordWithOffset.envelopeLoaded)
logger.debug(
"Rejecting unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}",
seqNr: java.lang.Long,
pid,
prevSeqNr: java.lang.Long,
recordWithOffset.offset)
else
logger.warn(
"Rejecting unexpected sequence number [{}] for pid [{}], previous sequence number [{}]. Offset: {}",
seqNr: java.lang.Long,
pid,
prevSeqNr: java.lang.Long,
recordWithOffset.offset)
}
def logUnknown(): Unit = {
if (viaPubSub(recordWithOffset.offset)) {
logger.debug(
"Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr: java.lang.Long,
pid,
recordWithOffset.offset)
} else if (recordWithOffset.envelopeLoaded) {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
logger.debug(
"Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr: java.lang.Long,
pid,
recordWithOffset.offset)
} else {
logger.warn(
"Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}",
seqNr: java.lang.Long,
pid,
recordWithOffset.offset)
}
}
if (prevSeqNr > 0) {
// expecting seqNr to be +1 of previously known
val ok = seqNr == prevSeqNr + 1
if (ok) {
FutureTrue
} else if (seqNr <= currentInflight.getOrElse(pid, 0L)) {
// currentInFlight contains those that have been processed or about to be processed in Flow,
// but offset not saved yet => ok to handle as duplicate
FutureFalse
} else if (recordWithOffset.envelopeLoaded) {
logUnexpected()
FutureFalse
} else {
logUnexpected()
// This will result in projection restart (with normal configuration)
Future.failed(
new IllegalStateException(
s"Rejected envelope from backtracking, persistenceId [$pid], seqNr [$seqNr] " +
"due to unexpected sequence number. " +
"Please report this issue at https://github.com/apache/incubator-pekko-persistence-r2dbc"))
}
} else if (seqNr == 1) {
// always accept first event if no other event for that pid has been seen
FutureTrue
} else {
// Haven't see seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the time window of the offset store.
// Backtracking will emit missed event again.
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val before = currentState.latestTimestamp.minus(settings.timeWindow)
if (previousTimestamp.isBefore(before)) {
logger.debug(
"Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before time window [{}].",
pid,
seqNr: java.lang.Long,
previousTimestamp,
before)
true
} else if (recordWithOffset.envelopeLoaded) {
logUnknown()
false
} else {
logUnknown()
// This will result in projection restart (with normal configuration)
throw new IllegalStateException(
s"Rejected envelope from backtracking, persistenceId [$pid], seqNr [$seqNr], " +
"due to unknown sequence number. " +
"Please report this issue at https://github.com/apache/incubator-pekko-persistence-r2dbc")
}
case None =>
// previous not found, could have been deleted
true
}
}
} else {
// strictSeqNr == false is for durable state where each revision might not be visible
val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L))
val ok = seqNr > prevSeqNr
if (ok) {
FutureTrue
} else {
logger.trace("Filtering out earlier revision [{}] for pid [{}], previous revision [{}]", seqNr: java.lang.Long,
pid,
prevSeqNr: java.lang.Long)
FutureFalse
}
}
}