in core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala [289:366]
def main(args: Array[String]): Unit = {
implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
implicit val actorSystem: ActorSystem =
ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
if (useClusterBootstrap) {
AkkaManagement(actorSystem).start()
ClusterBootstrap(actorSystem).start()
}
// Prepare Kamon shutdown
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
logger.info(this, s"Shutting down Kamon with coordinated shutdown")
Kamon.stopModules().map(_ => Done)
}
def abort(message: String) = {
logger.error(this, message)
actorSystem.terminate()
Await.result(actorSystem.whenTerminated, 30.seconds)
sys.exit(1)
}
// extract configuration data from the environment
implicit val config = new WhiskConfig(requiredProperties)
if (!config.isValid) {
abort("Bad configuration, cannot start.")
}
val port = config.servicePort.toInt
val host = config.schedulerHost
val rpcPort = config.schedulerRpcPort.toInt
val akkaPort = config.schedulerAkkaPort.toInt
// if deploying multiple instances (scale out), must pass the instance number as they need to be uniquely identified.
require(args.length >= 1, "scheduler instance required")
val instanceId = SchedulerInstanceId(args(0))
initKamon(instanceId)
val msgProvider = SpiLoader.get[MessagingProvider]
Seq(
(topicPrefix + "scheduler" + instanceId.asString, "scheduler", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
(
topicPrefix + "creationAck" + instanceId.asString,
"creationAck",
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))
.foreach {
case (topic, topicConfigurationKey, maxMessageBytes) =>
if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) {
abort(s"failure during msgProvider.ensureTopic for topic $topic")
}
}
ExecManifest.initialize(config) match {
case Success(_) =>
val schedulerEndpoints = SchedulerEndpoints(host, rpcPort, akkaPort)
// Create scheduler
val scheduler = new Scheduler(instanceId, schedulerEndpoints)
Http()
.newServerAt("0.0.0.0", port = rpcPort)
.bind(scheduler.serviceHandlers)
.foreach { _ =>
val httpsConfig =
if (Scheduler.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https"))
else None
BasicHttpService.startHttpService(FPCSchedulerServer.instance(scheduler).route, port, httpsConfig)(
actorSystem)
}
case Failure(t) =>
abort(s"Invalid runtimes manifest: $t")
}
}