def migrateAll()

in migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala [150:194]


  def migrateAll(): Future[Result] = {
    log.info("Migration started.")
    val result =
      sourcePersistenceIdsQuery
        .currentPersistenceIds()
        .mapAsyncUnordered(parallelism) { persistenceId =>
          for {
            _ <- createProgressTable
            currentProgress <- migrationDao.currentProgress(persistenceId)
            eventCount <- migrateEvents(persistenceId, currentProgress)
            snapshotCount <- migrateSnapshot(persistenceId, currentProgress)
          } yield persistenceId -> Result(1, eventCount, snapshotCount)
        }
        .map { case (pid, result @ Result(_, events, snapshots)) =>
          log.debug(
            "Migrated persistenceId [{}] with [{}] events{}.",
            pid,
            events: java.lang.Long,
            if (snapshots == 0) "" else " and snapshot")
          result
        }
        .runWith(Sink.fold(Result.empty) { case (acc: Result, Result(_, events, snapshots)) =>
          val result = Result(acc.persistenceIds + 1, acc.events + events, acc.snapshots + snapshots)
          if (result.persistenceIds % 100 == 0)
            log.info(
              "Migrated [{}] persistenceIds with [{}] events and [{}] snapshots.",
              result.persistenceIds: java.lang.Long,
              result.events: java.lang.Long,
              result.snapshots: java.lang.Long)
          result
        })

    result.transform {
      case s @ Success(Result(persistenceIds, events, snapshots)) =>
        log.info(
          "Migration successful. Migrated [{}] persistenceIds with [{}] events and [{}] snapshots.",
          persistenceIds: java.lang.Long,
          events: java.lang.Long,
          snapshots: java.lang.Long)
        s
      case f @ Failure(exc) =>
        log.error("Migration failed.", exc)
        f
    }
  }