def main()

in core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala [105:236]


  def main(args: Array[String]): Unit = {
    ConfigMXBean.register()
    implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
    implicit val actorSystem: ActorSystem =
      ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
    implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
    val poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
    val limitConfig: IntraConcurrencyLimitConfig =
      loadConfigOrThrow[IntraConcurrencyLimitConfig](ConfigKeys.concurrencyLimit)
    val tags: Seq[String] = Some(loadConfigOrThrow[String](ConfigKeys.invokerResourceTags))
      .map(_.trim())
      .filter(_ != "")
      .map(_.split(",").toSeq)
      .getOrElse(Seq.empty[String])
    val dedicatedNamespaces: Seq[String] = Some(loadConfigOrThrow[String](ConfigKeys.invokerDedicatedNamespaces))
      .map(_.trim())
      .filter(_ != "")
      .map(_.split(",").toSeq)
      .getOrElse(Seq.empty[String])

    logger.info(this, s"invoker tags: (${tags.mkString(", ")})")
    // Prepare Kamon shutdown
    CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
      logger.info(this, s"Shutting down Kamon with coordinated shutdown")
      Kamon.stopModules().map(_ => Done)
    }

    // load values for the required properties from the environment
    implicit val config = new WhiskConfig(requiredProperties, optionalProperties)

    def abort(message: String) = {
      logger.error(this, message)(TransactionId.invoker)
      actorSystem.terminate()
      Await.result(actorSystem.whenTerminated, 30.seconds)
      sys.exit(1)
    }

    if (!config.isValid) {
      abort("Bad configuration, cannot start.")
    }

    val execManifest = ExecManifest.initialize(config)
    if (execManifest.isFailure) {
      logger.error(this, s"Invalid runtimes manifest: ${execManifest.failed.get}")
      abort("Bad configuration, cannot start.")
    }

    /** Returns Some(s) if the string is not empty with trimmed whitespace, None otherwise. */
    def nonEmptyString(s: String): Option[String] = {
      val trimmed = s.trim
      if (trimmed.nonEmpty) Some(trimmed) else None
    }

    // process command line arguments
    // We accept the command line grammar of:
    // Usage: invoker [options] [<proposedInvokerId>]
    //    --uniqueName <value>   a unique name to dynamically assign Kafka topics from Zookeeper
    //    --displayedName <value> a name to identify this invoker via invoker health protocol
    //    --id <value>     proposed invokerId
    //    --overwriteId <value> proposed invokerId to re-write with uniqueName in Zookeeper,
    //    DO NOT USE overwriteId unless sure invokerId does not exist for other uniqueName
    def parse(ls: List[String], c: CmdLineArgs): CmdLineArgs = {
      ls match {
        case "--uniqueName" :: uniqueName :: tail =>
          parse(tail, c.copy(uniqueName = nonEmptyString(uniqueName)))
        case "--displayedName" :: displayedName :: tail =>
          parse(tail, c.copy(displayedName = nonEmptyString(displayedName)))
        case "--id" :: id :: tail if Try(id.toInt).isSuccess =>
          parse(tail, c.copy(id = Some(id.toInt)))
        case "--overwriteId" :: overwriteId :: tail if Try(overwriteId.toInt).isSuccess =>
          parse(tail, c.copy(overwriteId = Some(overwriteId.toInt)))
        case Nil => c
        case _   => abort(s"Error processing command line arguments $ls")
      }
    }
    val cmdLineArgs = parse(args.toList, CmdLineArgs())
    logger.info(this, "Command line arguments parsed to yield " + cmdLineArgs)

    val assignedInvokerId = cmdLineArgs match {
      // --id is defined with a valid value, use this id directly.
      case CmdLineArgs(_, Some(id), _, _) =>
        logger.info(this, s"invokerReg: using proposedInvokerId $id")
        id

      // --uniqueName is defined with a valid value, id is empty, assign an id via zookeeper
      case CmdLineArgs(Some(unique), None, _, overwriteId) =>
        if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":") ||
            config.zookeeperHosts.equals("")) {
          abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
        }
        new InstanceIdAssigner(config.zookeeperHosts).setAndGetId(unique, overwriteId)

      case _ => abort(s"Either --id or --uniqueName must be configured with correct values")
    }

    initKamon(assignedInvokerId)

    val topicBaseName = "invoker"
    val topicName = topicPrefix + topicBaseName + assignedInvokerId

    val maxMessageBytes = Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)
    val invokerInstance =
      InvokerInstanceId(
        assignedInvokerId,
        cmdLineArgs.uniqueName,
        cmdLineArgs.displayedName,
        poolConfig.userMemory,
        None,
        tags,
        dedicatedNamespaces)

    val msgProvider = SpiLoader.get[MessagingProvider]
    if (msgProvider
          .ensureTopic(config, topic = topicName, topicConfig = topicBaseName, maxMessageBytes = maxMessageBytes)
          .isFailure) {
      abort(s"failure during msgProvider.ensureTopic for topic $topicName")
    }

    val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
    val invoker = try {
      SpiLoader.get[InvokerProvider].instance(config, invokerInstance, producer, poolConfig, limitConfig)
    } catch {
      case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
    }

    val port = config.servicePort.toInt
    val httpsConfig =
      if (Invoker.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.invoker.https")) else None

    val invokerServer = SpiLoader.get[InvokerServerProvider].instance(invoker)
    BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)(actorSystem)
  }