private def behavior()

in example/src/main/scala/org/apache/pekko/persistence/cassandra/example/ReadSide.scala [52:99]


  private def behavior(
      topic: ActorRef[Topic.Command[ReadSideTopic.ReadSideMetrics]],
      nr: Int,
      settings: Settings,
      killSwitch: SharedKillSwitch): Behavior[Command] =
    Behaviors.withTimers { timers =>
      timers.startTimerAtFixedRate(ReportMetrics, 10.second)
      Behaviors.setup { ctx =>
        val start = settings.tagsPerProcessor * nr
        val end = start + (settings.tagsPerProcessor) - 1
        val tags = (start to end).map(i => s"tag-$i")
        ctx.log.info("Processor {} processing tags {}", nr, tags)
        // milliseconds, highest value = 1 minute
        val histogram = new Histogram(10 * 1000 * 60, 2)
        // maybe easier to just have these as different actors
        // my thinking is we can start with a large number of tags and scale out
        // read side processors later
        // having more tags will also increase write throughput/latency as it'll write to
        // many partitions
        // downside is running many streams/queries against c*
        tags.foreach(tag =>
          new EventProcessorStream[ConfigurablePersistentActor.Event](
            ctx.system,
            ctx.executionContext,
            s"processor-$nr",
            tag).runQueryStream(killSwitch, histogram))

        Behaviors
          .receiveMessage[Command] {
            case ReportMetrics =>
              if (histogram.getTotalCount > 0) {
                topic ! Topic.Publish(
                  ReadSideTopic.ReadSideMetrics(
                    histogram.getTotalCount,
                    histogram.getMaxValue,
                    histogram.getValueAtPercentile(99),
                    histogram.getValueAtPercentile(50)))
                histogram.reset()
              }
              Behaviors.same
          }
          .receiveSignal {
            case (_, PostStop) =>
              killSwitch.shutdown()
              Behaviors.same
          }
      }
    }