def writeJournalRows()

in core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala [40:55]


  def writeJournalRows(xs: Seq[(JournalPekkoSerializationRow, Set[String])])(implicit ec: ExecutionContext) = {
    val sorted = xs.sortBy(event => event._1.sequenceNumber)
    if (sorted.exists(_._2.nonEmpty)) {
      // only if there are any tags
      val (events, tags) = sorted.unzip
      for {
        ids <- insertAndReturn ++= events
        tagInserts = ids.zip(tags).flatMap { case (id, tags) => tags.map(tag => TagRow(id, tag)) }
        _ <- TagTableC ++= tagInserts
      } yield ()
    } else {
      // optimization avoid some work when not using tags
      val events = sorted.map(_._1)
      JournalTableC ++= events
    }
  }