in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala [489:534]
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
currentEventsByTagInternal(tag, offset)
.mapConcat(r => toEventEnvelope(r.persistentRepr, TimeBasedUUID(r.offset)))
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
/**
* INTERNAL API
*/
@InternalApi private[pekko] def currentEventsByTagInternal(
tag: String,
offset: Offset): Source[UUIDPersistentRepr, NotUsed] =
if (!eventsByTagSettings.eventsByTagEnabled)
Source.failed(new IllegalStateException("Events by tag queries are disabled"))
else {
try {
val (fromOffset, usingOffset) = offsetToInternalOffset(offset)
val prereqs = eventsByTagPrereqs(tag, usingOffset, fromOffset)
// pick up all the events written this millisecond
val toOffset = Some(Uuids.endOf(System.currentTimeMillis()))
createFutureSource(prereqs) {
case (s, (statements, initialTagPidSequenceNrs, scanner)) =>
val session =
new TagStageSession(tag, querySettings.readProfile, s, statements, eventsByTagSettings.retrySettings)
Source.fromGraph(
EventsByTagStage(
session,
fromOffset,
toOffset,
settings,
None,
eventsByTagSettings.bucketSize,
usingOffset,
initialTagPidSequenceNrs,
scanner))
}.via(deserializeEventsByTagRow)
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
.mapMaterializedValue(_ => NotUsed)
} catch {
case NonFatal(e) =>
// e.g. from cassandraSession, or selectStatement
log.debug("Could not run currentEventsByTag [{}] query, due to: {}", tag, e.getMessage)
Source.failed(e)
}
}