private def eventsByTag()

in core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala [230:280]


  private def eventsByTag(
      tag: String,
      offset: Long,
      terminateAfterOffset: Option[Long]): Source[EventEnvelope, NotUsed] = {
    import pekko.pattern.ask
    import FlowControl._
    implicit val askTimeout: Timeout = Timeout(readJournalConfig.journalSequenceRetrievalConfiguration.askTimeout)
    val batchSize = readJournalConfig.maxBufferSize

    Source
      .unfoldAsync[(Long, FlowControl), Seq[EventEnvelope]]((offset, Continue)) { case (from, control) =>
        def retrieveNextBatch() = {
          for {
            queryUntil <- journalSequenceActor.ask(GetMaxOrderingId).mapTo[MaxOrderingId]
            xs <- currentJournalEventsByTag(tag, from, batchSize, queryUntil).runWith(Sink.seq)
          } yield {
            val hasMoreEvents = xs.size == batchSize
            val nextControl: FlowControl =
              terminateAfterOffset match {
                // we may stop if target is behind queryUntil and we don't have more events to fetch
                case Some(target) if !hasMoreEvents && target <= queryUntil.maxOrdering => Stop
                // We may also stop if we have found an event with an offset >= target
                case Some(target) if xs.exists(_.offset.value >= target) => Stop

                // otherwise, disregarding if Some or None, we must decide how to continue
                case _ =>
                  if (hasMoreEvents) Continue else ContinueDelayed
              }

            val nextStartingOffset = if (xs.isEmpty) {
              /* If no events matched the tag between `from` and `maxOrdering` then there is no need to execute the exact
               * same query again. We can continue querying from `maxOrdering`, which will save some load on the db.
               * (Note: we may never return a value smaller than `from`, otherwise we might return duplicate events) */
              math.max(from, queryUntil.maxOrdering)
            } else {
              // Continue querying from the largest offset
              xs.map(_.offset.value).max
            }
            Some(((nextStartingOffset, nextControl), xs))
          }
        }

        control match {
          case Stop     => Future.successful(None)
          case Continue => retrieveNextBatch()
          case ContinueDelayed =>
            pekko.pattern.after(readJournalConfig.refreshInterval, system.scheduler)(retrieveNextBatch())
        }
      }
      .mapConcat(identity)
  }