in migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala [150:194]
def migrateAll(): Future[Result] = {
log.info("Migration started.")
val result =
sourcePersistenceIdsQuery
.currentPersistenceIds()
.mapAsyncUnordered(parallelism) { persistenceId =>
for {
_ <- createProgressTable
currentProgress <- migrationDao.currentProgress(persistenceId)
eventCount <- migrateEvents(persistenceId, currentProgress)
snapshotCount <- migrateSnapshot(persistenceId, currentProgress)
} yield persistenceId -> Result(1, eventCount, snapshotCount)
}
.map { case (pid, result @ Result(_, events, snapshots)) =>
log.debug(
"Migrated persistenceId [{}] with [{}] events{}.",
pid,
events: java.lang.Long,
if (snapshots == 0) "" else " and snapshot")
result
}
.runWith(Sink.fold(Result.empty) { case (acc: Result, Result(_, events, snapshots)) =>
val result = Result(acc.persistenceIds + 1, acc.events + events, acc.snapshots + snapshots)
if (result.persistenceIds % 100 == 0)
log.info(
"Migrated [{}] persistenceIds with [{}] events and [{}] snapshots.",
result.persistenceIds: java.lang.Long,
result.events: java.lang.Long,
result.snapshots: java.lang.Long)
result
})
result.transform {
case s @ Success(Result(persistenceIds, events, snapshots)) =>
log.info(
"Migration successful. Migrated [{}] persistenceIds with [{}] events and [{}] snapshots.",
persistenceIds: java.lang.Long,
events: java.lang.Long,
snapshots: java.lang.Long)
s
case f @ Failure(exc) =>
log.error("Migration failed.", exc)
f
}
}