def main()

in example/src/main/scala/org/apache/pekko/persistence/cassandra/example/Main.scala [26:75]


  def main(args: Array[String]): Unit = {

    ActorSystem(Behaviors.setup[SelfUp] {
        ctx =>
          val readSettings = ReadSide.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
          val writeSettings = ConfigurablePersistentActor.Settings(readSettings.nrTags)
          val loadSettings = LoadGenerator.Settings(ctx.system.settings.config.getConfig("cassandra.example"))

          PekkoManagement(ctx.system).start()
          ClusterBootstrap(ctx.system).start()
          val cluster = Cluster(ctx.system)
          cluster.subscriptions ! Subscribe(ctx.self, classOf[SelfUp])

          val topic = ReadSideTopic.init(ctx)

          if (cluster.selfMember.hasRole("read")) {
            val session = CassandraSessionRegistry(ctx.system).sessionFor("pekko.persistence.cassandra")
            val offsetTableStmt =
              """
              CREATE TABLE IF NOT EXISTS pekko.offsetStore (
                eventProcessorId text,
                tag text,
                timeUuidOffset timeuuid,
                PRIMARY KEY (eventProcessorId, tag)
              )
           """

            Await.ready(session.executeDDL(offsetTableStmt), 30.seconds)
          }

          Behaviors.receiveMessage {
            case SelfUp(state) =>
              ctx.log.infoN(
                "Cluster member joined. Initializing persistent actors. Roles {}. Members {}",
                cluster.selfMember.roles,
                state.members)
              val ref = ConfigurablePersistentActor.init(writeSettings, ctx.system)
              if (cluster.selfMember.hasRole("read")) {
                ctx.spawnAnonymous(Reporter(topic))
              }
              ReadSide(ctx.system, topic, readSettings)
              if (cluster.selfMember.hasRole("load")) {
                ctx.log.info("Starting load generation")
                val load = ctx.spawn(LoadGenerator(loadSettings, ref), "load-generator")
                load ! Start(10.seconds)
              }
              Behaviors.empty
          }
      }, "apc-example")
  }