in example/src/main/scala/org/apache/pekko/persistence/cassandra/example/ReadSide.scala [52:99]
private def behavior(
topic: ActorRef[Topic.Command[ReadSideTopic.ReadSideMetrics]],
nr: Int,
settings: Settings,
killSwitch: SharedKillSwitch): Behavior[Command] =
Behaviors.withTimers { timers =>
timers.startTimerAtFixedRate(ReportMetrics, 10.second)
Behaviors.setup { ctx =>
val start = settings.tagsPerProcessor * nr
val end = start + (settings.tagsPerProcessor) - 1
val tags = (start to end).map(i => s"tag-$i")
ctx.log.info("Processor {} processing tags {}", nr, tags)
// milliseconds, highest value = 1 minute
val histogram = new Histogram(10 * 1000 * 60, 2)
// maybe easier to just have these as different actors
// my thinking is we can start with a large number of tags and scale out
// read side processors later
// having more tags will also increase write throughput/latency as it'll write to
// many partitions
// downside is running many streams/queries against c*
tags.foreach(tag =>
new EventProcessorStream[ConfigurablePersistentActor.Event](
ctx.system,
ctx.executionContext,
s"processor-$nr",
tag).runQueryStream(killSwitch, histogram))
Behaviors
.receiveMessage[Command] {
case ReportMetrics =>
if (histogram.getTotalCount > 0) {
topic ! Topic.Publish(
ReadSideTopic.ReadSideMetrics(
histogram.getTotalCount,
histogram.getMaxValue,
histogram.getValueAtPercentile(99),
histogram.getValueAtPercentile(50)))
histogram.reset()
}
Behaviors.same
}
.receiveSignal {
case (_, PostStop) =>
killSwitch.shutdown()
Behaviors.same
}
}
}