in migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/SnapshotMigrator.scala [70:88]
private def toSnapshotData(row: SnapshotRow): (SnapshotMetadata, Any) = serializer.deserialize(row).get
/**
* migrate the latest snapshot data
*/
def migrateLatest(): Future[Done] = {
legacyJournalDao
.allPersistenceIdsSource(Long.MaxValue)
.mapAsync(NoParallelism) { persistenceId =>
// let us fetch the latest snapshot for each persistenceId
snapshotDB.run(queries.selectLatestByPersistenceId(persistenceId).result).map { rows =>
rows.headOption.map(toSnapshotData).map { case (metadata, value) =>
log.debug(s"migrating snapshot for ${metadata.toString}")
defaultSnapshotDao.save(metadata, value)
}
}
}
.runWith(Sink.ignore)
}