in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala [71:112]
def apply(
session: TagStageSession,
fromOffset: UUID,
toOffset: Option[UUID],
settings: PluginSettings,
refreshInterval: Option[FiniteDuration],
bucketSize: BucketSize,
usingOffset: Boolean,
initialTagPidSequenceNrs: Map[Tag, (TagPidSequenceNr, UUID)],
scanner: TagViewSequenceNumberScanner): EventsByTagStage =
new EventsByTagStage(
session,
fromOffset,
toOffset,
settings,
refreshInterval,
bucketSize,
usingOffset,
initialTagPidSequenceNrs,
scanner)
private[pekko] class TagStageSession(
val tag: String,
readProfile: String,
session: CqlSession,
statements: EventByTagStatements,
retries: RetrySettings) {
def selectEventsForBucket(
bucket: TimeBucket,
from: UUID,
to: UUID,
onFailure: (Int, Throwable, FiniteDuration) => Unit)(
implicit ec: ExecutionContext,
scheduler: Scheduler): Future[AsyncResultSet] = {
Retries.retry({ () =>
val bound =
statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, to).setExecutionProfileName(readProfile)
session.executeAsync(bound).asScala
}, retries.retries, onFailure, retries.minDuration, retries.maxDuration, retries.randomFactor)
}
}