def init()

in pekko-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala [38:116]


  def init(remotingPort: Int, pekkoManagementPort: Int, frontEndPort: Int): Unit = {
    ActorSystem(Behaviors.setup[Command] {
        ctx =>
          PekkoManagement(ctx.system.toClassic).start()
          val cluster = Cluster(ctx.system)
          val upAdapter = ctx.messageAdapter[SelfUp](_ => NodeMemberUp)
          cluster.subscriptions ! Subscribe(upAdapter, classOf[SelfUp])
          val settings = ProcessorSettings("kafka-to-sharding-processor", ctx.system.toClassic)
          ctx.pipeToSelf(UserEvents.init(ctx.system, settings)) {
            case Success(extractor) => ShardingStarted(extractor)
            case Failure(ex)        => throw ex
          }
          starting(ctx, None, joinedCluster = false, settings)
      }, "KafkaToSharding", config(remotingPort, pekkoManagementPort))

    def start(ctx: ActorContext[Command], region: ActorRef[UserEvents.Command], settings: ProcessorSettings)
        : Behavior[Command] = {
      import ctx.executionContext
      ctx.log.info("Sharding started and joined cluster. Starting event processor")
      val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(region, settings), "kafka-event-processor")
      val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, region)
      binding.onComplete {
        case Failure(t) =>
          ctx.self ! BindingFailed(t)
        case _ =>
      }
      running(ctx, binding, eventProcessor)
    }

    def starting(ctx: ActorContext[Command], sharding: Option[ActorRef[UserEvents.Command]], joinedCluster: Boolean,
        settings: ProcessorSettings): Behavior[Command] = Behaviors
      .receive[Command] {
        case (ctx, ShardingStarted(region)) if joinedCluster =>
          ctx.log.info("Sharding has started")
          start(ctx, region, settings)
        case (_, ShardingStarted(region)) =>
          ctx.log.info("Sharding has started")
          starting(ctx, Some(region), joinedCluster, settings)
        case (ctx, NodeMemberUp) if sharding.isDefined =>
          ctx.log.info("Member has joined the cluster")
          start(ctx, sharding.get, settings)
        case (_, NodeMemberUp) =>
          ctx.log.info("Member has joined the cluster")
          starting(ctx, sharding, joinedCluster = true, settings)
      }

    def running(ctx: ActorContext[Command], binding: Future[Http.ServerBinding], processor: ActorRef[Nothing])
        : Behavior[Command] =
      Behaviors.receiveMessagePartial[Command] {
        case BindingFailed(t) =>
          ctx.log.error("Failed to bind front end", t)
          Behaviors.stopped
      }.receiveSignal {
        case (ctx, Terminated(`processor`)) =>
          ctx.log.warn("Kafka event processor stopped. Shutting down")
          binding.map(_.unbind())(ctx.executionContext)
          Behaviors.stopped
      }

    def startGrpc(
        system: ActorSystem[_], frontEndPort: Int, region: ActorRef[UserEvents.Command]): Future[Http.ServerBinding] = {
      val mat = Materializer.createMaterializer(system.toClassic)
      val service: HttpRequest => Future[HttpResponse] =
        UserServiceHandler(new UserGrpcService(system, region))(system.toClassic)
      Http()(system.toClassic).bindAndHandleAsync(
        service,
        interface = "127.0.0.1",
        port = frontEndPort,
        connectionContext = HttpConnectionContext())(mat)

    }

    def config(port: Int, managementPort: Int): Config =
      ConfigFactory.parseString(s"""
      pekko.remote.artery.canonical.port = $port
      pekko.management.http.port = $managementPort
       """).withFallback(ConfigFactory.load())

  }