in core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala [105:236]
def main(args: Array[String]): Unit = {
ConfigMXBean.register()
implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
implicit val actorSystem: ActorSystem =
ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
val poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
val limitConfig: IntraConcurrencyLimitConfig =
loadConfigOrThrow[IntraConcurrencyLimitConfig](ConfigKeys.concurrencyLimit)
val tags: Seq[String] = Some(loadConfigOrThrow[String](ConfigKeys.invokerResourceTags))
.map(_.trim())
.filter(_ != "")
.map(_.split(",").toSeq)
.getOrElse(Seq.empty[String])
val dedicatedNamespaces: Seq[String] = Some(loadConfigOrThrow[String](ConfigKeys.invokerDedicatedNamespaces))
.map(_.trim())
.filter(_ != "")
.map(_.split(",").toSeq)
.getOrElse(Seq.empty[String])
logger.info(this, s"invoker tags: (${tags.mkString(", ")})")
// Prepare Kamon shutdown
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
logger.info(this, s"Shutting down Kamon with coordinated shutdown")
Kamon.stopModules().map(_ => Done)
}
// load values for the required properties from the environment
implicit val config = new WhiskConfig(requiredProperties, optionalProperties)
def abort(message: String) = {
logger.error(this, message)(TransactionId.invoker)
actorSystem.terminate()
Await.result(actorSystem.whenTerminated, 30.seconds)
sys.exit(1)
}
if (!config.isValid) {
abort("Bad configuration, cannot start.")
}
val execManifest = ExecManifest.initialize(config)
if (execManifest.isFailure) {
logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
abort("Bad configuration, cannot start.")
}
/** Returns Some(s) if the string is not empty with trimmed whitespace, None otherwise. */
def nonEmptyString(s: String): Option[String] = {
val trimmed = s.trim
if (trimmed.nonEmpty) Some(trimmed) else None
}
// process command line arguments
// We accept the command line grammar of:
// Usage: invoker [options] [<proposedInvokerId>]
// --uniqueName <value> a unique name to dynamically assign Kafka topics from Zookeeper
// --displayedName <value> a name to identify this invoker via invoker health protocol
// --id <value> proposed invokerId
// --overwriteId <value> proposed invokerId to re-write with uniqueName in Zookeeper,
// DO NOT USE overwriteId unless sure invokerId does not exist for other uniqueName
def parse(ls: List[String], c: CmdLineArgs): CmdLineArgs = {
ls match {
case "--uniqueName" :: uniqueName :: tail =>
parse(tail, c.copy(uniqueName = nonEmptyString(uniqueName)))
case "--displayedName" :: displayedName :: tail =>
parse(tail, c.copy(displayedName = nonEmptyString(displayedName)))
case "--id" :: id :: tail if Try(id.toInt).isSuccess =>
parse(tail, c.copy(id = Some(id.toInt)))
case "--overwriteId" :: overwriteId :: tail if Try(overwriteId.toInt).isSuccess =>
parse(tail, c.copy(overwriteId = Some(overwriteId.toInt)))
case Nil => c
case _ => abort(s"Error processing command line arguments $ls")
}
}
val cmdLineArgs = parse(args.toList, CmdLineArgs())
logger.info(this, "Command line arguments parsed to yield " + cmdLineArgs)
val assignedInvokerId = cmdLineArgs match {
// --id is defined with a valid value, use this id directly.
case CmdLineArgs(_, Some(id), _, _) =>
logger.info(this, s"invokerReg: using proposedInvokerId $id")
id
// --uniqueName is defined with a valid value, id is empty, assign an id via zookeeper
case CmdLineArgs(Some(unique), None, _, overwriteId) =>
if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":") ||
config.zookeeperHosts.equals("")) {
abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
}
new InstanceIdAssigner(config.zookeeperHosts).setAndGetId(unique, overwriteId)
case _ => abort(s"Either --id or --uniqueName must be configured with correct values")
}
initKamon(assignedInvokerId)
val topicBaseName = "invoker"
val topicName = topicPrefix + topicBaseName + assignedInvokerId
val maxMessageBytes = Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)
val invokerInstance =
InvokerInstanceId(
assignedInvokerId,
cmdLineArgs.uniqueName,
cmdLineArgs.displayedName,
poolConfig.userMemory,
None,
tags,
dedicatedNamespaces)
val msgProvider = SpiLoader.get[MessagingProvider]
if (msgProvider
.ensureTopic(config, topic = topicName, topicConfig = topicBaseName, maxMessageBytes = maxMessageBytes)
.isFailure) {
abort(s"failure during msgProvider.ensureTopic for topic $topicName")
}
val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
val invoker = try {
SpiLoader.get[InvokerProvider].instance(config, invokerInstance, producer, poolConfig, limitConfig)
} catch {
case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
}
val port = config.servicePort.toInt
val httpsConfig =
if (Invoker.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.invoker.https")) else None
val invokerServer = SpiLoader.get[InvokerServerProvider].instance(invoker)
BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)(actorSystem)
}