private def migrateEvents()

in migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala [207:238]


  private def migrateEvents(persistenceId: String, currentProgress: Option[CurrentProgress]): Future[Long] = {
    val progressSeqNr = currentProgress.map(_.eventSeqNr).getOrElse(0L)
    sourceEventsByPersistenceIdQuery
      .currentEventsByPersistenceId(persistenceId, progressSeqNr + 1, Long.MaxValue)
      .map(serializedJournalRow)
      .grouped(targetBatch)
      .mapAsync(1) { events =>
        targetJournalDao
          .writeEvents(events)
          .recoverWith { case _: R2dbcDataIntegrityViolationException =>
            // events already exists, which is ok, but since the batch
            // failed we must try again one-by-one
            Future.sequence(events.map { event =>
              targetJournalDao
                .writeEvents(List(event))
                .recoverWith { case _: R2dbcDataIntegrityViolationException =>
                  // ok, already exists
                  log
                    .debug("event already exists, persistenceId [{}], seqNr [{}]", event.persistenceId, event.seqNr)
                  Future.successful(())
                }
            })
          }
          .map(_ => events.last.seqNr -> events.size)
      }
      .mapAsync(1) { case (seqNr, count) =>
        migrationDao
          .updateEventProgress(persistenceId, seqNr)
          .map(_ => count)
      }
      .runWith(Sink.fold(0L) { case (acc, count) => acc + count })
  }