def readSequenceNr()

in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [294:374]


  def readSequenceNr(persistenceId: String, highest: Boolean): Future[Long] = {
    if (Tracing) log.debug("readSequenceNr(highest={}, persistenceId={})", highest, persistenceId)
    val keyGroups = readSequenceNrBatches(persistenceId, highest).map(_.map(getMaxSeqNr).recover {
      case ex: Throwable => -1L
    })
    Future.sequence(keyGroups).flatMap { seq =>
      seq.max match {
        case -1L =>
          val highOrLow = if (highest) "highest" else "lowest"
          throw new DynamoDBJournalFailure(s"cannot read $highOrLow sequence number for persistenceId $persistenceId")
        case start =>
          if (highest) {
            /*
             * When reading the highest sequence number the stored value always points to the Sort=0 entry
             * for which it was written, all other entries do not update the highest value. Therefore we
             * must scan the partition of this Sort=0 entry and find the highest occupied number.
             */
            getAllPartitionSequenceNrs(persistenceId, start).flatMap { result =>
              if (result.getItems.isEmpty) {
                /*
                 * If this comes back empty then that means that all events have been deleted. The only
                 * reliable way to obtain the previously highest number is to also read the lowest number
                 * (which is always stored in full), knowing that it will be either highest-1 or zero.
                 */
                readSequenceNr(persistenceId, highest = false).map { lowest =>
                  val ret = Math.max(start, lowest - 1)
                  log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
                  ret
                }
              } else if (Fixes.HighDistrust) { // allows recovering from failed high mark setting
                // this function will keep on chasing the event source tail
                // if HighDistrust is enabled and as long as the partitionMax == PartitionSize - 1
                def tailChase(partitionStart: Long, nextResults: QueryResult): Future[Long] = {
                  if (nextResults.getItems.isEmpty) {
                    // first iteraton will not pass here, as the query result is not empty
                    // if the new query result is empty the highest observed is partition -1
                    Future.successful(partitionStart - 1)
                  } else {
                    /*
                     * `partitionStart` is the Sort=0 entry’s sequence number, so add the maximum sort key.
                     */
                    val partitionMax = nextResults.getItems.asScala.map(_.get(Sort).getN.toLong).max
                    val ret = partitionStart + partitionMax

                    if (partitionMax == PartitionSize - 1) {
                      val nextStart = ret + 1
                      getAllPartitionSequenceNrs(persistenceId, nextStart)
                        .map { logResult =>
                          if (!logResult.getItems().isEmpty()) // will only log if a follow-up query produced results
                            log.warning(
                              "readSequenceNr(highest=true persistenceId={}) tail found after {}",
                              persistenceId,
                              ret)
                          logResult
                        }
                        .flatMap(tailChase(nextStart, _))
                    } else
                      Future.successful(ret)
                  }
                }

                tailChase(start, result).map { ret =>
                  log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
                  ret
                }
              } else {
                /*
                 * `start` is the Sort=0 entry’s sequence number, so add the maximum sort key.
                 */
                val ret = start + result.getItems.asScala.map(_.get(Sort).getN.toLong).max
                log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
                Future.successful(ret)
              }
            }
          } else {
            log.debug("readSequenceNr(highest=false persistenceId={}) = {}", persistenceId, start)
            Future.successful(start)
          }
      }
    }
  }