in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala [303:348]
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
eventsByTagInternal(tag, offset)
.mapConcat(r => toEventEnvelope(r.persistentRepr, TimeBasedUUID(r.offset)))
.mapMaterializedValue(_ => NotUsed)
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
/**
* INTERNAL API
*/
@InternalApi private[pekko] def eventsByTagInternal(
tag: String, offset: Offset): Source[UUIDPersistentRepr, NotUsed] =
if (!eventsByTagSettings.eventsByTagEnabled)
Source.failed(
new IllegalStateException(
"Events by tag queries are disabled with configuration " +
"events-by-tag.enabled=off"))
else {
try {
val (fromOffset, usingOffset) = offsetToInternalOffset(offset)
val prereqs = eventsByTagPrereqs(tag, usingOffset, fromOffset)
createFutureSource(prereqs) {
case (s, (statements, initialTagPidSequenceNrs, scanner)) =>
val session =
new TagStageSession(tag, querySettings.readProfile, s, statements, eventsByTagSettings.retrySettings)
Source.fromGraph(
EventsByTagStage(
session,
fromOffset,
None,
settings,
Some(querySettings.refreshInterval),
eventsByTagSettings.bucketSize,
usingOffset,
initialTagPidSequenceNrs,
scanner))
}.via(deserializeEventsByTagRow)
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
.mapMaterializedValue(_ => NotUsed)
} catch {
case NonFatal(e) =>
// e.g. from cassandraSession, or selectStatement
log.debug("Could not run eventsByTag [{}] query, due to: {}", tag, e.getMessage)
Source.failed(e)
}
}