def add()

in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [90:130]


    def add(records: immutable.IndexedSeq[Record]): State = {
      records.foldLeft(this) { case (acc, r) =>
        val newByPid =
          acc.byPid.get(r.pid) match {
            case Some(existingRecord) =>
              if (r.seqNr > existingRecord.seqNr)
                acc.byPid.updated(r.pid, r)
              else
                acc.byPid // older or same seqNr
            case None =>
              acc.byPid.updated(r.pid, r)
          }

        val latestTimestamp = acc.latestTimestamp
        val newLatest =
          if (r.timestamp.isAfter(latestTimestamp)) {
            Vector(r)
          } else if (r.timestamp == latestTimestamp) {
            acc.latest.find(_.pid == r.pid) match {
              case None                 => acc.latest :+ r
              case Some(existingRecord) =>
                // keep highest seqNr
                if (r.seqNr >= existingRecord.seqNr)
                  acc.latest.filterNot(_.pid == r.pid) :+ r
                else
                  acc.latest
            }
          } else {
            acc.latest // older than existing latest, keep existing latest
          }
        val newOldestTimestamp =
          if (acc.oldestTimestamp == Instant.EPOCH)
            r.timestamp // first record
          else if (r.timestamp.isBefore(acc.oldestTimestamp))
            r.timestamp
          else
            acc.oldestTimestamp // this is the normal case

        acc.copy(byPid = newByPid, latest = newLatest, oldestTimestamp = newOldestTimestamp)
      }
    }