def apply()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala [71:112]


  def apply(
      session: TagStageSession,
      fromOffset: UUID,
      toOffset: Option[UUID],
      settings: PluginSettings,
      refreshInterval: Option[FiniteDuration],
      bucketSize: BucketSize,
      usingOffset: Boolean,
      initialTagPidSequenceNrs: Map[Tag, (TagPidSequenceNr, UUID)],
      scanner: TagViewSequenceNumberScanner): EventsByTagStage =
    new EventsByTagStage(
      session,
      fromOffset,
      toOffset,
      settings,
      refreshInterval,
      bucketSize,
      usingOffset,
      initialTagPidSequenceNrs,
      scanner)

  private[pekko] class TagStageSession(
      val tag: String,
      readProfile: String,
      session: CqlSession,
      statements: EventByTagStatements,
      retries: RetrySettings) {

    def selectEventsForBucket(
        bucket: TimeBucket,
        from: UUID,
        to: UUID,
        onFailure: (Int, Throwable, FiniteDuration) => Unit)(
        implicit ec: ExecutionContext,
        scheduler: Scheduler): Future[AsyncResultSet] = {
      Retries.retry({ () =>
          val bound =
            statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, to).setExecutionProfileName(readProfile)
          session.executeAsync(bound).asScala
        }, retries.retries, onFailure, retries.minDuration, retries.maxDuration, retries.randomFactor)
    }
  }