override def currentEventsByTag()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala [489:534]


  override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
    currentEventsByTagInternal(tag, offset)
      .mapConcat(r => toEventEnvelope(r.persistentRepr, TimeBasedUUID(r.offset)))
      .named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))

  /**
   * INTERNAL API
   */
  @InternalApi private[pekko] def currentEventsByTagInternal(
      tag: String,
      offset: Offset): Source[UUIDPersistentRepr, NotUsed] =
    if (!eventsByTagSettings.eventsByTagEnabled)
      Source.failed(new IllegalStateException("Events by tag queries are disabled"))
    else {
      try {
        val (fromOffset, usingOffset) = offsetToInternalOffset(offset)
        val prereqs = eventsByTagPrereqs(tag, usingOffset, fromOffset)
        // pick up all the events written this millisecond
        val toOffset = Some(Uuids.endOf(System.currentTimeMillis()))

        createFutureSource(prereqs) {
          case (s, (statements, initialTagPidSequenceNrs, scanner)) =>
            val session =
              new TagStageSession(tag, querySettings.readProfile, s, statements, eventsByTagSettings.retrySettings)
            Source.fromGraph(
              EventsByTagStage(
                session,
                fromOffset,
                toOffset,
                settings,
                None,
                eventsByTagSettings.bucketSize,
                usingOffset,
                initialTagPidSequenceNrs,
                scanner))
        }.via(deserializeEventsByTagRow)
          .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
          .mapMaterializedValue(_ => NotUsed)

      } catch {
        case NonFatal(e) =>
          // e.g. from cassandraSession, or selectStatement
          log.debug("Could not run currentEventsByTag [{}] query, due to: {}", tag, e.getMessage)
          Source.failed(e)
      }
    }