def main()

in core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala [289:366]


  def main(args: Array[String]): Unit = {
    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
    implicit val actorSystem: ActorSystem =
      ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))

    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))

    if (useClusterBootstrap) {
      AkkaManagement(actorSystem).start()
      ClusterBootstrap(actorSystem).start()
    }

    // Prepare Kamon shutdown
    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
      Kamon.stopModules().map(_ => Done)
    }

    def abort(message: String) = {
      logger.error(this, message)
      actorSystem.terminate()
      Await.result(actorSystem.whenTerminated, 30.seconds)
      sys.exit(1)
    }

    // extract configuration data from the environment
    implicit val config = new WhiskConfig(requiredProperties)
    if (!config.isValid) {
      abort("Bad configuration, cannot start.")
    }

    val port = config.servicePort.toInt
    val host = config.schedulerHost
    val rpcPort = config.schedulerRpcPort.toInt
    val akkaPort = config.schedulerAkkaPort.toInt

    // if deploying multiple instances (scale out), must pass the instance number as they need to be uniquely identified.
    require(args.length >= 1, "scheduler instance required")
    val instanceId = SchedulerInstanceId(args(0))

    initKamon(instanceId)

    val msgProvider = SpiLoader.get[MessagingProvider]

    Seq(
      (topicPrefix + "scheduler" + instanceId.asString, "scheduler", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
      (
        topicPrefix + "creationAck" + instanceId.asString,
        "creationAck",
        Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))
      .foreach {
        case (topic, topicConfigurationKey, maxMessageBytes) =>
          if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) {
            abort(s"failure during msgProvider.ensureTopic for topic $topic")
          }
      }

    ExecManifest.initialize(config) match {
      case Success(_) =>
        val schedulerEndpoints = SchedulerEndpoints(host, rpcPort, akkaPort)
        // Create scheduler
        val scheduler = new Scheduler(instanceId, schedulerEndpoints)

        Http()
          .newServerAt("0.0.0.0", port = rpcPort)
          .bind(scheduler.serviceHandlers)
          .foreach { _ =>
            val httpsConfig =
              if (Scheduler.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https"))
              else None

            BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, port, httpsConfig)(
              actorSystem)
          }
      case Failure(t) =>
        abort(s"Invalid runtimes manifest: $t")
    }
  }