in core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala [124:159]
private def loadNAsync(metadata: immutable.Seq[SnapshotMetadata]): Future[Option[SelectedSnapshot]] = metadata match {
case Seq() => Future.successful(None) // no snapshots stored
case md +: mds =>
load1Async(md)
.map {
case DeserializedSnapshot(payload, OptionVal.None) =>
Some(SelectedSnapshot(md, payload))
case DeserializedSnapshot(payload, some) =>
Some(SelectedSnapshot(md.withMetadata(some.x), payload))
}
.recoverWith {
case _: NoSuchElementException if metadata.size == 1 =>
// Thrown load1Async when snapshot couldn't be found, which can happen since metadata and the
// actual snapshot might not be replicated at exactly same time.
// Treat this as if there were no snapshots.
Future.successful(None)
case e =>
if (mds.isEmpty) {
log.warning(
s"Failed to load snapshot [$md] ({} of {}), last attempt. Caused by: [{}: {}]",
snapshotSettings.maxLoadAttempts,
snapshotSettings.maxLoadAttempts,
e.getClass.getName,
e.getMessage)
Future.failed(e) // all attempts failed
} else {
log.warning(
s"Failed to load snapshot [$md] ({} of {}), trying older one. Caused by: [{}: {}]",
snapshotSettings.maxLoadAttempts - mds.size,
snapshotSettings.maxLoadAttempts,
e.getClass.getName,
e.getMessage)
loadNAsync(mds) // try older snapshot
}
}
}