in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [294:374]
def readSequenceNr(persistenceId: String, highest: Boolean): Future[Long] = {
if (Tracing) log.debug("readSequenceNr(highest={}, persistenceId={})", highest, persistenceId)
val keyGroups = readSequenceNrBatches(persistenceId, highest).map(_.map(getMaxSeqNr).recover {
case ex: Throwable => -1L
})
Future.sequence(keyGroups).flatMap { seq =>
seq.max match {
case -1L =>
val highOrLow = if (highest) "highest" else "lowest"
throw new DynamoDBJournalFailure(s"cannot read $highOrLow sequence number for persistenceId $persistenceId")
case start =>
if (highest) {
/*
* When reading the highest sequence number the stored value always points to the Sort=0 entry
* for which it was written, all other entries do not update the highest value. Therefore we
* must scan the partition of this Sort=0 entry and find the highest occupied number.
*/
getAllPartitionSequenceNrs(persistenceId, start).flatMap { result =>
if (result.getItems.isEmpty) {
/*
* If this comes back empty then that means that all events have been deleted. The only
* reliable way to obtain the previously highest number is to also read the lowest number
* (which is always stored in full), knowing that it will be either highest-1 or zero.
*/
readSequenceNr(persistenceId, highest = false).map { lowest =>
val ret = Math.max(start, lowest - 1)
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
ret
}
} else if (Fixes.HighDistrust) { // allows recovering from failed high mark setting
// this function will keep on chasing the event source tail
// if HighDistrust is enabled and as long as the partitionMax == PartitionSize - 1
def tailChase(partitionStart: Long, nextResults: QueryResult): Future[Long] = {
if (nextResults.getItems.isEmpty) {
// first iteraton will not pass here, as the query result is not empty
// if the new query result is empty the highest observed is partition -1
Future.successful(partitionStart - 1)
} else {
/*
* `partitionStart` is the Sort=0 entry’s sequence number, so add the maximum sort key.
*/
val partitionMax = nextResults.getItems.asScala.map(_.get(Sort).getN.toLong).max
val ret = partitionStart + partitionMax
if (partitionMax == PartitionSize - 1) {
val nextStart = ret + 1
getAllPartitionSequenceNrs(persistenceId, nextStart)
.map { logResult =>
if (!logResult.getItems().isEmpty()) // will only log if a follow-up query produced results
log.warning(
"readSequenceNr(highest=true persistenceId={}) tail found after {}",
persistenceId,
ret)
logResult
}
.flatMap(tailChase(nextStart, _))
} else
Future.successful(ret)
}
}
tailChase(start, result).map { ret =>
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
ret
}
} else {
/*
* `start` is the Sort=0 entry’s sequence number, so add the maximum sort key.
*/
val ret = start + result.getItems.asScala.map(_.get(Sort).getN.toLong).max
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
Future.successful(ret)
}
}
} else {
log.debug("readSequenceNr(highest=false persistenceId={}) = {}", persistenceId, start)
Future.successful(start)
}
}
}
}