private def toSnapshotData()

in migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/SnapshotMigrator.scala [70:88]


  private def toSnapshotData(row: SnapshotRow): (SnapshotMetadata, Any) = serializer.deserialize(row).get

  /**
   * migrate the latest snapshot data
   */
  def migrateLatest(): Future[Done] = {
    legacyJournalDao
      .allPersistenceIdsSource(Long.MaxValue)
      .mapAsync(NoParallelism) { persistenceId =>
        // let us fetch the latest snapshot for each persistenceId
        snapshotDB.run(queries.selectLatestByPersistenceId(persistenceId).result).map { rows =>
          rows.headOption.map(toSnapshotData).map { case (metadata, value) =>
            log.debug(s"migrating snapshot for ${metadata.toString}")
            defaultSnapshotDao.save(metadata, value)
          }
        }
      }
      .runWith(Sink.ignore)
  }