in migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala [207:238]
private def migrateEvents(persistenceId: String, currentProgress: Option[CurrentProgress]): Future[Long] = {
val progressSeqNr = currentProgress.map(_.eventSeqNr).getOrElse(0L)
sourceEventsByPersistenceIdQuery
.currentEventsByPersistenceId(persistenceId, progressSeqNr + 1, Long.MaxValue)
.map(serializedJournalRow)
.grouped(targetBatch)
.mapAsync(1) { events =>
targetJournalDao
.writeEvents(events)
.recoverWith { case _: R2dbcDataIntegrityViolationException =>
// events already exists, which is ok, but since the batch
// failed we must try again one-by-one
Future.sequence(events.map { event =>
targetJournalDao
.writeEvents(List(event))
.recoverWith { case _: R2dbcDataIntegrityViolationException =>
// ok, already exists
log
.debug("event already exists, persistenceId [{}], seqNr [{}]", event.persistenceId, event.seqNr)
Future.successful(())
}
})
}
.map(_ => events.last.seqNr -> events.size)
}
.mapAsync(1) { case (seqNr, count) =>
migrationDao
.updateEventProgress(persistenceId, seqNr)
.map(_ => count)
}
.runWith(Sink.fold(0L) { case (acc, count) => acc + count })
}